From 36d5ddcb310bfde48b968c14af9e3fdbef4e7381 Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Fri, 19 Sep 2025 15:32:20 +0900 Subject: [PATCH 01/16] feat: keep ssrc when server down --- .../2025-09-19-060609_create_streams/down.sql | 2 + .../2025-09-19-060609_create_streams/up.sql | 9 ++++ apps/server/src/db/models.rs | 23 +++++++++- apps/server/src/db/repository/mod.rs | 1 + apps/server/src/db/repository/streams.rs | 36 +++++++++++++++ apps/server/src/db/schema.rs | 11 +++++ apps/server/src/handler/streams.rs | 41 +++++++++++++++-- apps/server/src/initial.rs | 46 ++++++++++++++++++- apps/server/src/main.rs | 14 ++++-- apps/server/src/rtp.rs | 25 ++++------ 10 files changed, 183 insertions(+), 25 deletions(-) create mode 100644 apps/server/migrations/2025-09-19-060609_create_streams/down.sql create mode 100644 apps/server/migrations/2025-09-19-060609_create_streams/up.sql create mode 100644 apps/server/src/db/repository/streams.rs diff --git a/apps/server/migrations/2025-09-19-060609_create_streams/down.sql b/apps/server/migrations/2025-09-19-060609_create_streams/down.sql new file mode 100644 index 0000000..004aa50 --- /dev/null +++ b/apps/server/migrations/2025-09-19-060609_create_streams/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE streams; diff --git a/apps/server/migrations/2025-09-19-060609_create_streams/up.sql b/apps/server/migrations/2025-09-19-060609_create_streams/up.sql new file mode 100644 index 0000000..f20f093 --- /dev/null +++ b/apps/server/migrations/2025-09-19-060609_create_streams/up.sql @@ -0,0 +1,9 @@ +-- Your SQL goes here +CREATE TABLE streams ( + ssrc INTEGER PRIMARY KEY NOT NULL, + topic TEXT NOT NULL, + device_id TEXT NOT NULL, + media_type TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE(topic, device_id) +); \ No newline at end of file diff --git a/apps/server/src/db/models.rs b/apps/server/src/db/models.rs index dd68b3e..515c978 100644 --- a/apps/server/src/db/models.rs +++ b/apps/server/src/db/models.rs @@ -1,6 +1,7 @@ use crate::db::schema::{ device_tokens, devices, entities, entities_configurations, events, flow_versions, flows, - map_features, map_layers, map_vertices, states, states_meta, system_configurations, users, + map_features, map_layers, map_vertices, states, states_meta, streams, system_configurations, + users, }; use chrono::NaiveDateTime; use diesel::prelude::*; @@ -465,3 +466,23 @@ pub struct NewCustomNode<'a> { pub struct UpdateCustomNode { pub data: Option, } + +#[derive(Queryable, Selectable, Insertable, AsChangeset, Serialize, Deserialize, Debug)] +#[diesel(table_name = streams)] +#[diesel(primary_key(ssrc))] +pub struct Stream { + pub ssrc: i32, + pub topic: String, + pub device_id: String, + pub media_type: String, + pub created_at: NaiveDateTime, +} + +#[derive(Insertable)] +#[diesel(table_name = streams)] +pub struct NewStream<'a> { + pub ssrc: i32, + pub topic: &'a str, + pub device_id: &'a str, + pub media_type: &'a str, +} diff --git a/apps/server/src/db/repository/mod.rs b/apps/server/src/db/repository/mod.rs index cbc6701..0eeee45 100644 --- a/apps/server/src/db/repository/mod.rs +++ b/apps/server/src/db/repository/mod.rs @@ -1,4 +1,5 @@ pub mod rbac; +pub mod streams; use std::collections::HashMap; diff --git a/apps/server/src/db/repository/streams.rs b/apps/server/src/db/repository/streams.rs new file mode 100644 index 0000000..6a286ab --- /dev/null +++ b/apps/server/src/db/repository/streams.rs @@ -0,0 +1,36 @@ +use std::collections::HashMap; + +use crate::{ + db::{ + models::{ + NewPermission, NewRole, NewRolePermission, NewStream, NewUserRole, Permission, Role, + RolePermission, RoleWithPermissions, Stream, + }, + schema::streams, + }, + state::DbPool, +}; +use diesel::prelude::*; + +pub fn get_all_streams(pool: &DbPool) -> Result, anyhow::Error> { + use crate::db::schema::streams::dsl::*; + + let mut conn = pool.get()?; + let all_streams = streams.select(Stream::as_select()).load(&mut conn)?; + Ok(all_streams) +} + +pub fn upsert_stream(pool: &DbPool, new_stream: &NewStream) -> Result { + use crate::db::schema::streams; + + let mut conn = pool.get()?; + + let stream = diesel::insert_into(streams::table) + .values(new_stream) + .on_conflict((streams::topic, streams::device_id)) + .do_update() + .set(streams::ssrc.eq(new_stream.ssrc)) + .get_result(&mut conn)?; + + Ok(stream) +} diff --git a/apps/server/src/db/schema.rs b/apps/server/src/db/schema.rs index 2194a5a..bc2c8b0 100644 --- a/apps/server/src/db/schema.rs +++ b/apps/server/src/db/schema.rs @@ -159,6 +159,16 @@ diesel::table! { } } +diesel::table! { + streams (ssrc) { + ssrc -> Integer, + topic -> Text, + device_id -> Text, + media_type -> Text, + created_at -> Timestamp, + } +} + diesel::table! { system_configurations (id) { id -> Integer, @@ -220,6 +230,7 @@ diesel::allow_tables_to_appear_in_same_query!( roles, states, states_meta, + streams, system_configurations, user_roles, users, diff --git a/apps/server/src/handler/streams.rs b/apps/server/src/handler/streams.rs index 41fdefe..d8a8891 100644 --- a/apps/server/src/handler/streams.rs +++ b/apps/server/src/handler/streams.rs @@ -10,6 +10,7 @@ use tokio::{ use tracing::info; use crate::{ + db::{self, models::NewStream}, handler::auth::DeviceTokenAuth, state::{AppState, MediaType, StreamInfo}, }; @@ -33,6 +34,26 @@ pub async fn register_stream( ) -> impl IntoResponse { let ssrc = rand::rngs::ThreadRng::default().random_range(0..=i32::MAX) as u32; + let media_type_str = match payload.media_type { + MediaType::Audio => "audio", + MediaType::Video => "video", + }; + + let new_stream_db = NewStream { + ssrc: ssrc as i32, + topic: &payload.topic, + device_id: &auth.device_id, + media_type: media_type_str, + }; + + let db_stream = match db::repository::streams::upsert_stream(&state.pool, &new_stream_db) { + Ok(stream) => stream, + Err(e) => { + tracing::error!("Failed to upsert stream to database: {}", e); + return (StatusCode::INTERNAL_SERVER_ERROR).into_response(); + } + }; + let (packet_tx, _) = broadcast::channel::(1024); let stream_info = StreamInfo { @@ -58,11 +79,23 @@ pub async fn register_stream( payload.topic, ssrc ); + let configs = match db::repository::get_all_system_configs(&state.pool) { + Ok(configs) => configs, + Err(e) => { + tracing::error!("Failed to get system configs: {}", e); + return (StatusCode::INTERNAL_SERVER_ERROR).into_response(); + } + }; + + let rtp_port = configs + .iter() + .find(|c| c.key == "rtp_broker_port") + .and_then(|c| c.value.parse::().ok()) + .unwrap_or(5004); + ( StatusCode::OK, - Json(RegisterStreamResponse { - ssrc, - rtp_port: 5004, - }), + Json(RegisterStreamResponse { ssrc, rtp_port }), ) + .into_response() } diff --git a/apps/server/src/initial.rs b/apps/server/src/initial.rs index f344767..e453f98 100644 --- a/apps/server/src/initial.rs +++ b/apps/server/src/initial.rs @@ -1,9 +1,17 @@ +use std::sync::Arc; + +use dashmap::DashMap; use diesel::{ ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper, SqliteConnection, }; -use tracing::info; +use rtp::packet::Packet; +use tokio::sync::{broadcast, RwLock}; +use tokio::time::Instant; +use tracing::{error, info, warn}; use crate::db::models::{NewPermission, Permission, User}; +use crate::db::repository::{self, streams}; +use crate::state::{DbPool, MediaType, StreamInfo}; use crate::{ db::models::{NewSystemConfiguration, NewUser, SystemConfiguration}, hash::hash_password, @@ -156,3 +164,39 @@ pub fn seed_initial_permissions(conn: &mut SqliteConnection) { } info!("Permission seeding complete."); } + +pub fn create_hydrate_streams(pool: &DbPool, streams: &Arc>) { + info!("Hydrating streams from database..."); + match repository::streams::get_all_streams(&pool) { + Ok(db_streams) => { + for stream in db_streams { + let (packet_tx, _) = broadcast::channel::(1024); + let media_type = match stream.media_type.as_str() { + "audio" => MediaType::Audio, + "video" => MediaType::Video, + _ => { + warn!( + "Unknown media type '{}' for SSRC {}", + stream.media_type, stream.ssrc + ); + continue; + } + }; + + let stream_info = StreamInfo { + topic: stream.topic, + user_id: stream.device_id, + packet_tx, + media_type, + last_seen: Arc::new(RwLock::new(Instant::now())), + is_online: Arc::new(RwLock::new(false)), + }; + streams.insert(stream.ssrc as u32, stream_info); + } + info!("Hydrated {} streams into memory.", streams.len()); + } + Err(e) => { + error!("Failed to hydrate streams from database: {}", e); + } + } +} diff --git a/apps/server/src/main.rs b/apps/server/src/main.rs index 46cfe99..51cd146 100644 --- a/apps/server/src/main.rs +++ b/apps/server/src/main.rs @@ -1,3 +1,4 @@ +use ::rtp::packet::Packet; use anyhow::Result; use dashmap::DashMap; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; @@ -6,6 +7,7 @@ use std::{env, sync::Arc}; use tokio::{ sync::{broadcast, mpsc, watch, RwLock}, task::JoinSet, + time::Instant, }; use tracing::{error, info, warn}; use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; @@ -15,12 +17,15 @@ const LOG_FILE_PATH: &str = "log/app.log"; use crate::{ db::conn::establish_connection, flow::manager_state::FlowManagerActor, - initial::{create_initial_admin, create_initial_configurations, seed_initial_permissions}, + initial::{ + create_hydrate_streams, create_initial_admin, create_initial_configurations, + seed_initial_permissions, + }, lib::{entity_map::remap_topics, stream_checker::stream_status_checker}, logo::print_logo, routes::web_server, rtp::rtp_receiver, - state::{AppState, FrameData, MqttMessage, StreamInfo, StreamManager}, + state::{AppState, FrameData, MediaType, MqttMessage, StreamInfo, StreamManager}, }; mod config; @@ -73,6 +78,8 @@ async fn main() -> Result<()> { warn!("APPLICATION IS RUNNING IN DEBUG MODE. ONLY THE WEB SERVER WILL BE ACTIVATED."); } + let streams: Arc> = Arc::new(DashMap::::new()); + let pool = establish_connection(&settings.database_url); { @@ -86,10 +93,9 @@ async fn main() -> Result<()> { create_initial_admin(&mut conn); create_initial_configurations(&mut conn); seed_initial_permissions(&mut conn); + create_hydrate_streams(&pool, &streams); } - let streams = Arc::new(DashMap::::new()); - let (mqtt_tx, _) = broadcast::channel::(1024); let (rtsp_frame_tx, _) = broadcast::channel::(256); diff --git a/apps/server/src/rtp.rs b/apps/server/src/rtp.rs index cdd967c..3be515a 100644 --- a/apps/server/src/rtp.rs +++ b/apps/server/src/rtp.rs @@ -1,13 +1,9 @@ use anyhow::Result; use tokio::{net::UdpSocket, time::Instant}; -use tracing::{ info, warn}; -use webrtc::{ - rtp::packet::Packet, - util::Unmarshal, -}; - -use crate::{ state::{ StreamManager}}; +use tracing::{info, warn}; +use webrtc::{rtp::packet::Packet, util::Unmarshal}; +use crate::state::StreamManager; pub async fn rtp_receiver(addr: String, stream_manager: StreamManager) -> Result<()> { let sock = UdpSocket::bind(&addr).await?; @@ -26,12 +22,15 @@ pub async fn rtp_receiver(addr: String, stream_manager: StreamManager) -> Result match Packet::unmarshal(&mut &buf[..n]) { Ok(packet) => { let ssrc = packet.header.ssrc; - + if let Some(stream_info) = stream_manager.get(&ssrc) { let mut is_online_guard = stream_info.is_online.write().await; if !*is_online_guard { *is_online_guard = true; - info!("Topic '{}' (SSRC: {}) is now ONLINE.", &stream_info.topic, ssrc); + info!( + "Topic '{}' (SSRC: {}) is now ONLINE.", + &stream_info.topic, ssrc + ); } drop(is_online_guard); @@ -39,11 +38,7 @@ pub async fn rtp_receiver(addr: String, stream_manager: StreamManager) -> Result *last_seen_guard = Instant::now(); drop(last_seen_guard); - if stream_info.value().packet_tx.send(packet).is_err() { - - } - } else { - warn!("Received packet from {} with unknown SSRC: {}", from, ssrc); + if stream_info.value().packet_tx.send(packet).is_err() {} } } Err(e) => { @@ -51,4 +46,4 @@ pub async fn rtp_receiver(addr: String, stream_manager: StreamManager) -> Result } } } -} \ No newline at end of file +} From 585129eed55898bece4014fa6b0b038d58be59da Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Fri, 19 Sep 2025 16:38:28 +0900 Subject: [PATCH 02/16] feat: set limit of max webrtc connection --- .../src/features/rtc/StreamReceiver.tsx | 24 +++++-- .../src/features/rtc/WebRTCProvider.tsx | 33 ++++++++-- apps/client/src/features/rtc/rtc.ts | 64 +++++++++++++------ apps/sentinel-go/stream-test/audio.go | 3 +- 4 files changed, 96 insertions(+), 28 deletions(-) diff --git a/apps/client/src/features/rtc/StreamReceiver.tsx b/apps/client/src/features/rtc/StreamReceiver.tsx index a9e5fe5..c3ddba3 100644 --- a/apps/client/src/features/rtc/StreamReceiver.tsx +++ b/apps/client/src/features/rtc/StreamReceiver.tsx @@ -1,6 +1,7 @@ import { useEffect, useRef, useMemo } from "react"; import { Button } from "@/components/ui/button"; import { useWebRTC } from "./WebRTCProvider"; +import { WebRTCManager } from "./rtc"; type StreamReceiverProps = { topic: string; @@ -9,12 +10,15 @@ type StreamReceiverProps = { export function StreamReceiver({ topic, streamType }: StreamReceiverProps) { const mediaRef = useRef(null); - const { rtcManager, streams } = useWebRTC(); + const { rtcManager, streams, audioStreamCount, videoStreamCount } = + useWebRTC(); - const stream = useMemo(() => { + const streamInfo = useMemo(() => { return streams.get(topic) || null; }, [streams, topic]); + const stream = streamInfo?.stream || null; + useEffect(() => { if (stream && mediaRef.current) { mediaRef.current.srcObject = stream; @@ -26,16 +30,28 @@ export function StreamReceiver({ topic, streamType }: StreamReceiverProps) { const handleSubscribe = () => { if (rtcManager) { - rtcManager.subscribe(topic); + rtcManager.subscribe(topic, streamType); } else { console.error("RTCManager is not ready."); } }; + const isLimitReached = + (streamType === "audio" && + audioStreamCount >= WebRTCManager.MAX_AUDIO_STREAMS) || + (streamType === "video" && + videoStreamCount >= WebRTCManager.MAX_VIDEO_STREAMS); + if (!stream) { return ( - ); } diff --git a/apps/client/src/features/rtc/WebRTCProvider.tsx b/apps/client/src/features/rtc/WebRTCProvider.tsx index 77648d1..d35e059 100644 --- a/apps/client/src/features/rtc/WebRTCProvider.tsx +++ b/apps/client/src/features/rtc/WebRTCProvider.tsx @@ -4,13 +4,21 @@ import { useEffect, useState, ReactNode, + useMemo, } from "react"; import { useWebSocket } from "../ws/WebSocketProvider"; import { WebRTCManager } from "./rtc"; +type StreamInfo = { + stream: MediaStream; + type: "audio" | "video"; +}; + type WebRTCContextType = { rtcManager: WebRTCManager | null; - streams: Map; + streams: Map; + audioStreamCount: number; + videoStreamCount: number; }; const WebRTCContext = createContext(undefined); @@ -30,13 +38,13 @@ type WebRTCProviderProps = { export function WebRTCProvider({ children }: WebRTCProviderProps) { const { wsManager, isConnected } = useWebSocket(); const [rtcManager, setRtcManager] = useState(null); - const [streams, setStreams] = useState>(new Map()); + const [streams, setStreams] = useState>(new Map()); useEffect(() => { if (isConnected && wsManager) { console.log("WebSocket connected, initializing WebRTCManager."); - const onStreamsChanged = (newStreams: Map) => { + const onStreamsChanged = (newStreams: Map) => { setStreams(newStreams); }; @@ -51,7 +59,24 @@ export function WebRTCProvider({ children }: WebRTCProviderProps) { } }, [isConnected, wsManager]); - const value = { rtcManager, streams }; + const { audioStreamCount, videoStreamCount } = useMemo(() => { + const counts = { audioStreamCount: 0, videoStreamCount: 0 }; + for (const streamInfo of streams.values()) { + if (streamInfo.type === "audio") { + counts.audioStreamCount++; + } else { + counts.videoStreamCount++; + } + } + return counts; + }, [streams]); + + const value = { + rtcManager, + streams, + audioStreamCount, + videoStreamCount, + }; return ( {children} diff --git a/apps/client/src/features/rtc/rtc.ts b/apps/client/src/features/rtc/rtc.ts index ae1a369..cca7996 100644 --- a/apps/client/src/features/rtc/rtc.ts +++ b/apps/client/src/features/rtc/rtc.ts @@ -3,14 +3,21 @@ import { WebSocketChannel, WebSocketMessage } from "../ws/ws"; export class WebRTCManager { private pc: RTCPeerConnection; private signaling: WebSocketChannel; - public streams: Map; - private onStreamsChanged: (streams: Map) => void; - private pendingTopics: string[]; + public streams: Map; + private onStreamsChanged: ( + streams: Map, + ) => void; + private pendingTopics: Array<{ topic: string; type: "audio" | "video" }>; private candidateQueue: RTCIceCandidateInit[] = []; + public static readonly MAX_AUDIO_STREAMS = 2; + public static readonly MAX_VIDEO_STREAMS = 4; + constructor( signaling: WebSocketChannel, - onStreamsChanged: (streams: Map) => void, + onStreamsChanged: ( + streams: Map, + ) => void, ) { this.signaling = signaling; this.streams = new Map(); @@ -35,20 +42,19 @@ export class WebRTCManager { }; this.pc.ontrack = (event) => { - const topic = this.pendingTopics.shift(); - if (!topic) { + const pending = this.pendingTopics.shift(); + if (!pending) { return; } + const { topic, type } = pending; const track = event.track; const stream = new MediaStream([track]); - console.log( - `Track received for topic "${topic}": ${track.kind}, New Stream ID: ${stream.id}`, - ); + console.log(` "${topic}": ${track.kind} : ${stream.id}`); if (!this.streams.has(topic)) { - this.streams.set(topic, stream); + this.streams.set(topic, { stream, type }); this.onStreamsChanged(new Map(this.streams)); } }; @@ -111,11 +117,12 @@ export class WebRTCManager { public async connect(): Promise { try { - this.pc.addTransceiver("audio", { direction: "recvonly" }); - this.pc.addTransceiver("video", { direction: "recvonly" }); - this.pc.addTransceiver("video", { direction: "recvonly" }); - this.pc.addTransceiver("video", { direction: "recvonly" }); - this.pc.addTransceiver("video", { direction: "recvonly" }); + for (let i = 0; i < WebRTCManager.MAX_AUDIO_STREAMS; i++) { + this.pc.addTransceiver("audio", { direction: "recvonly" }); + } + for (let i = 0; i < WebRTCManager.MAX_VIDEO_STREAMS; i++) { + this.pc.addTransceiver("video", { direction: "recvonly" }); + } const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); @@ -129,9 +136,29 @@ export class WebRTCManager { } } - public subscribe(topic: string): void { - console.log(`Subscribing to topic: ${topic}`); - this.pendingTopics.push(topic); + public subscribe(topic: string, streamType: "audio" | "video"): void { + const currentStreams = Array.from(this.streams.values()); + const audioCount = currentStreams.filter((s) => s.type === "audio").length; + const videoCount = currentStreams.filter((s) => s.type === "video").length; + + if ( + streamType === "audio" && + audioCount >= WebRTCManager.MAX_AUDIO_STREAMS + ) { + console.warn("Max audio streams reached. Cannot subscribe."); + return; + } + + if ( + streamType === "video" && + videoCount >= WebRTCManager.MAX_VIDEO_STREAMS + ) { + console.warn("Max video streams reached. Cannot subscribe."); + return; + } + + console.log(`Subscribing to topic: ${topic} (${streamType})`); + this.pendingTopics.push({ topic, type: streamType }); this.signaling.send({ type: "subscribe_stream", @@ -148,6 +175,5 @@ export class WebRTCManager { } this.streams.clear(); this.onStreamsChanged(new Map(this.streams)); - console.log("WebRTCManager closed."); } } diff --git a/apps/sentinel-go/stream-test/audio.go b/apps/sentinel-go/stream-test/audio.go index f4a38ce..eadd9de 100644 --- a/apps/sentinel-go/stream-test/audio.go +++ b/apps/sentinel-go/stream-test/audio.go @@ -23,7 +23,7 @@ func main() { deviceToken := "FnhXd7dNy8iCPbu5N5jS2v_NaOYCiI9AqPO4FQQed7E" serverURL := "http://127.0.0.1:8080/api/streams/register" - topic := "go_stream_1" + topic := "go_stream_2" mediaType := "audio" if deviceId == "" || deviceToken == "" { @@ -78,6 +78,7 @@ func main() { cmd := exec.Command( "ffmpeg", "-re", + "-stream_loop", "-1", "-i", "./sample.mp3", "-vn", "-map", "0:a:0", From ef5644c7a79ff3b1d5473c17780c44d49fffd7bd Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Fri, 19 Sep 2025 16:43:09 +0900 Subject: [PATCH 03/16] fix: split gps feature --- apps/client/src/features/gps/parseGps.ts | 17 +++++++++++++++++ apps/client/src/features/map/index.tsx | 19 +------------------ 2 files changed, 18 insertions(+), 18 deletions(-) create mode 100644 apps/client/src/features/gps/parseGps.ts diff --git a/apps/client/src/features/gps/parseGps.ts b/apps/client/src/features/gps/parseGps.ts new file mode 100644 index 0000000..4eb7b71 --- /dev/null +++ b/apps/client/src/features/gps/parseGps.ts @@ -0,0 +1,17 @@ +export const parseGpsState = ( + state: string | null | undefined, +): [number, number] | null => { + if (!state) return null; + + const latMatch = state.match(/lat=([-\d.]+)/); + const lngMatch = state.match(/lng=([-\d.]+)/); + + if (latMatch && lngMatch && latMatch[1] && lngMatch[1]) { + const lat = parseFloat(latMatch[1]); + const lng = parseFloat(lngMatch[1]); + if (!isNaN(lat) && !isNaN(lng)) { + return [lat, lng]; + } + } + return null; +}; diff --git a/apps/client/src/features/map/index.tsx b/apps/client/src/features/map/index.tsx index 4103a79..0ebca24 100644 --- a/apps/client/src/features/map/index.tsx +++ b/apps/client/src/features/map/index.tsx @@ -15,6 +15,7 @@ import { DrawingPreview } from "./FeatureDrawingPreview"; import { FeatureRenderer } from "./FeatureRenderer"; import { FeatureEditor } from "./FeatureEditor"; import { FeatureDetailsPanel } from "./FeatureDetailsPanel"; +import { parseGpsState } from "../gps/parseGps"; const DefaultIcon = L.icon({ iconUrl: icon, @@ -27,24 +28,6 @@ const failedPosition = [39.8283, -98.5795]; L.Marker.prototype.options.icon = DefaultIcon; -const parseGpsState = ( - state: string | null | undefined, -): [number, number] | null => { - if (!state) return null; - - const latMatch = state.match(/lat=([-\d.]+)/); - const lngMatch = state.match(/lng=([-\d.]+)/); - - if (latMatch && lngMatch && latMatch[1] && lngMatch[1]) { - const lat = parseFloat(latMatch[1]); - const lng = parseFloat(lngMatch[1]); - if (!isNaN(lat) && !isNaN(lng)) { - return [lat, lng]; - } - } - return null; -}; - export function MapView() { const [position, setPosition] = useState<[number, number] | null>(null); const [entities, setEntities] = useState([]); From 8333c574451ba5a6f670196a6a571c7e6d4fdf68 Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Fri, 19 Sep 2025 17:06:38 +0900 Subject: [PATCH 04/16] feat: set json data in flow --- apps/client/src/features/flow/Graph.tsx | 2 + apps/client/src/features/flow/flowNode.ts | 14 ++++++ apps/server/src/flow/engine.rs | 2 + apps/server/src/flow/nodes/json_modify.rs | 59 +++++++++++++++++++++++ apps/server/src/flow/nodes/mod.rs | 1 + 5 files changed, 78 insertions(+) create mode 100644 apps/server/src/flow/nodes/json_modify.rs diff --git a/apps/client/src/features/flow/Graph.tsx b/apps/client/src/features/flow/Graph.tsx index 7985b7e..43cc820 100644 --- a/apps/client/src/features/flow/Graph.tsx +++ b/apps/client/src/features/flow/Graph.tsx @@ -105,6 +105,7 @@ export function Graph({ BRANCH: (g, d) => renderProcessingNode(g, d), JSON_SELECTOR: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), + JSON_MODIFY: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), YOLO_DETECT: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), GST_DECODER: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), @@ -137,6 +138,7 @@ export function Graph({ "INTERVAL", "BRANCH", "JSON_SELECTOR", + "JSON_MODIFY", ], }, { diff --git a/apps/client/src/features/flow/flowNode.ts b/apps/client/src/features/flow/flowNode.ts index a1bfd32..521a0c7 100644 --- a/apps/client/src/features/flow/flowNode.ts +++ b/apps/client/src/features/flow/flowNode.ts @@ -218,6 +218,20 @@ export const DEFINITION_NODE = { path: "STRING", }, }, + JSON_MODIFY: { + connectors: [ + { id: `id`, name: "json", type: "in" }, + { id: `id`, name: "data", type: "in" }, + { id: `id`, name: "value", type: "out" }, + ], + nodeType: "JSON_MODIFY", + data: { + path: "path.to.value", + }, + dataType: { + path: "STRING", + }, + }, DECODE_H264: { connectors: [ { id: `id`, name: "payload", type: "in" }, diff --git a/apps/server/src/flow/engine.rs b/apps/server/src/flow/engine.rs index 43a6f91..ed54447 100644 --- a/apps/server/src/flow/engine.rs +++ b/apps/server/src/flow/engine.rs @@ -16,6 +16,7 @@ use crate::flow::nodes::custom_node::CustomNode; use crate::flow::nodes::decode_h264::DecodeH264Node; use crate::flow::nodes::decode_opus::DecodeOpusNode; use crate::flow::nodes::gst_decoder::GstDecoderNode; +use crate::flow::nodes::json_modify::JsonModifyNode; use crate::flow::nodes::json_selector::JsonSelectorNode; use crate::flow::nodes::mqtt_publish::MqttPublishNode; use crate::flow::nodes::mqtt_subscribe::MqttSubscribeNode; @@ -215,6 +216,7 @@ impl FlowEngine { "DECODE_OPUS" => Ok(Box::new(DecodeOpusNode::new()?)), "BRANCH" => Ok(Box::new(BranchNode)), "JSON_SELECTOR" => Ok(Box::new(JsonSelectorNode::new(&node.data)?)), + "JSON_MODIFY" => Ok(Box::new(JsonModifyNode::new(&node.data)?)), "DECODE_H264" => Ok(Box::new(DecodeH264Node::new()?)), "YOLO_DETECT" => Ok(Box::new(YoloDetectNode::new( &node.data, diff --git a/apps/server/src/flow/nodes/json_modify.rs b/apps/server/src/flow/nodes/json_modify.rs new file mode 100644 index 0000000..d423c26 --- /dev/null +++ b/apps/server/src/flow/nodes/json_modify.rs @@ -0,0 +1,59 @@ +use crate::flow::engine::ExecutionContext; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use serde::Deserialize; +use serde_json::Value; +use std::collections::HashMap; + +use super::{ExecutableNode, ExecutionResult}; + +#[derive(Deserialize, Debug)] +struct JsonModifyData { + path: String, +} + +pub struct JsonModifyNode { + data: JsonModifyData, +} + +impl JsonModifyNode { + pub fn new(node_data: &Value) -> Result { + let data: JsonModifyData = serde_json::from_value(node_data.clone())?; + Ok(Self { data }) + } +} + +#[async_trait] +impl ExecutableNode for JsonModifyNode { + async fn execute( + &self, + _context: &mut ExecutionContext, + inputs: HashMap, + ) -> Result { + let mut json_input = inputs + .get("json") + .cloned() + .ok_or_else(|| anyhow!("'json' input is missing for JSON_Modify node"))?; + + let data_input = inputs + .get("data") + .cloned() + .ok_or_else(|| anyhow!("'data' input is missing for JSON_Set node"))?; + + let json_pointer_path = format!("/{}", self.data.path.replace('.', "/")); + + let target = json_input + .pointer_mut(&json_pointer_path) + .ok_or_else(|| anyhow!("Path '{}' not found in the input json", self.data.path))?; + + *target = data_input; + + let mut outputs = HashMap::new(); + outputs.insert("value".to_string(), json_input); + + Ok(ExecutionResult { + outputs, + ..Default::default() + }) + } +} diff --git a/apps/server/src/flow/nodes/mod.rs b/apps/server/src/flow/nodes/mod.rs index 6b90713..d04f98a 100644 --- a/apps/server/src/flow/nodes/mod.rs +++ b/apps/server/src/flow/nodes/mod.rs @@ -41,6 +41,7 @@ pub mod decode_opus; pub mod gst_decoder; pub mod http; pub mod interval; +pub mod json_modify; pub mod json_selector; pub mod log_message; pub mod logic_operator; From 511cff82e179091b4041d340b94a290fe552111b Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Fri, 19 Sep 2025 17:40:10 +0900 Subject: [PATCH 05/16] refactor: split map entity --- apps/client/src/features/code/FileTree.tsx | 6 +-- apps/client/src/features/flow/Flow.tsx | 2 +- .../EntityDetailsPanel.tsx | 4 +- .../client/src/features/map-entity/render.tsx | 47 +++++++++++++++++++ .../src/features/{map => map-entity}/store.ts | 4 +- .../src/features/map/FeatureDetailsPanel.tsx | 6 +-- apps/client/src/features/map/index.tsx | 36 ++------------ apps/client/src/pages/code/index.tsx | 4 +- apps/client/src/pages/map/index.tsx | 2 +- 9 files changed, 64 insertions(+), 47 deletions(-) rename apps/client/src/features/{map => map-entity}/EntityDetailsPanel.tsx (98%) create mode 100644 apps/client/src/features/map-entity/render.tsx rename apps/client/src/features/{map => map-entity}/store.ts (74%) diff --git a/apps/client/src/features/code/FileTree.tsx b/apps/client/src/features/code/FileTree.tsx index 212d84d..f0dce30 100644 --- a/apps/client/src/features/code/FileTree.tsx +++ b/apps/client/src/features/code/FileTree.tsx @@ -140,7 +140,7 @@ const TreeNode: React.FC = ({ entry, onDeleteRequest }) => { > {entry.isDir ? ( @@ -171,7 +171,7 @@ const TreeNode: React.FC = ({ entry, onDeleteRequest }) => { onDeleteRequest(entry)} - className='text-destructive focus:text-destructive' + className='text-red-500 focus:text-red-500' > Delete @@ -283,7 +283,7 @@ export const FileTree = () => { return (
-

Project

+

Project

)} +
+

Style

+
+
+
+ + Color +
+ {isEditing ? ( + setEditableColor(e.target.value)} + className='w-10 h-8 p-1 bg-transparent border rounded-md cursor-pointer' + /> + ) : ( +
+ )} +
+
+
+

Vertices

diff --git a/apps/client/src/features/map-draw/FeatureRenderer.tsx b/apps/client/src/features/map-draw/FeatureRenderer.tsx index c914d5a..6e596da 100644 --- a/apps/client/src/features/map-draw/FeatureRenderer.tsx +++ b/apps/client/src/features/map-draw/FeatureRenderer.tsx @@ -2,6 +2,7 @@ import { CircleMarker, Polygon, Polyline } from "react-leaflet"; import { LatLngExpression } from "leaflet"; import { FeatureWithVertices } from "@/entities/map/types"; import { useMapInteractionStore } from "@/entities/map/store"; +import { useMemo } from "react"; interface FeatureRendererProps { feature: FeatureWithVertices; @@ -15,6 +16,18 @@ export function FeatureRenderer({ feature }: FeatureRendererProps) { .sort((a, b) => a.sequence - b.sequence) .map((v) => [v.latitude, v.longitude] as LatLngExpression); + const featureColor = useMemo(() => { + try { + if (feature.style_properties) { + const styles = JSON.parse(feature.style_properties); + return styles.color || "#6ec7f0"; + } + } catch (e) { + console.error("Failed to parse style_properties", e); + } + return "#6ec7f0"; + }, [feature.style_properties]); + if (positions.length === 0) { return null; } @@ -25,12 +38,14 @@ export function FeatureRenderer({ feature }: FeatureRendererProps) { }, }; + const pathOptions = { color: featureColor }; + switch (feature.feature_type) { case "POINT": return ( @@ -40,7 +55,7 @@ export function FeatureRenderer({ feature }: FeatureRendererProps) { return ( ); @@ -49,7 +64,7 @@ export function FeatureRenderer({ feature }: FeatureRendererProps) { return ( ); From 9479d98d2d2f2c5d816598691547a9ea29883e71 Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Mon, 22 Sep 2025 15:41:50 +0900 Subject: [PATCH 10/16] fix: fix mabbox panel --- .../features/map-draw/FeatureDetailsPanel.tsx | 74 +++--- .../map-entity/EntityDetailsPanel.tsx | 228 ++++++++++-------- apps/client/src/features/map/index.tsx | 43 +++- apps/client/src/pages/map/index.tsx | 2 - 4 files changed, 207 insertions(+), 140 deletions(-) diff --git a/apps/client/src/features/map-draw/FeatureDetailsPanel.tsx b/apps/client/src/features/map-draw/FeatureDetailsPanel.tsx index cd9682a..6858589 100644 --- a/apps/client/src/features/map-draw/FeatureDetailsPanel.tsx +++ b/apps/client/src/features/map-draw/FeatureDetailsPanel.tsx @@ -10,6 +10,7 @@ import { Maximize, Ruler, Palette, + Minus, } from "lucide-react"; import { useMapDataStore, useMapInteractionStore } from "@/entities/map/store"; import { Button } from "@/components/ui/button"; @@ -33,7 +34,6 @@ import { import { cn } from "@/lib/utils"; import { MapVertex, UpdateFeaturePayload } from "@/entities/map/types"; import { calculateFeatureGeometry } from "@/lib/geometry-precision"; -import { useMapEntityStore } from "../map-entity/store"; const FeatureIcon = ({ type }: { type: string }) => { switch (type) { @@ -48,9 +48,15 @@ const FeatureIcon = ({ type }: { type: string }) => { } }; -export function FeatureDetailsPanel() { - const { selectedEntity } = useMapEntityStore(); +interface FeatureDetailsPanelProps { + isCollapsed: boolean; + onToggleCollapse: () => void; +} +export function FeatureDetailsPanel({ + isCollapsed, + onToggleCollapse, +}: FeatureDetailsPanelProps) { const { selectedFeature, setSelectedFeature } = useMapInteractionStore(); const { removeFeature, updateFeature } = useMapDataStore(); @@ -128,29 +134,43 @@ export function FeatureDetailsPanel() { } }; + const handleToggleCollapse = (e: React.MouseEvent) => { + e.stopPropagation(); + onToggleCollapse(); + }; + + if (!selectedFeature) { + return null; + } + return ( <> -
- {selectedFeature && ( - - -
- - - Feature #{selectedFeature.id} - - -
-
+ + +
+ + + Feature #{selectedFeature.id} + +
+ + +
+
+
+ {!isCollapsed && ( + <> {geometryInfo && (
@@ -228,7 +248,7 @@ export function FeatureDetailsPanel() {
- + {isEditing ? ( <>
+ diff --git a/apps/client/src/features/map-entity/EntityDetailsPanel.tsx b/apps/client/src/features/map-entity/EntityDetailsPanel.tsx index c54523a..ec2705d 100644 --- a/apps/client/src/features/map-entity/EntityDetailsPanel.tsx +++ b/apps/client/src/features/map-entity/EntityDetailsPanel.tsx @@ -6,7 +6,7 @@ import { CardHeader, CardTitle, } from "@/components/ui/card"; -import { X, MapPin, TabletSmartphone, Server } from "lucide-react"; +import { X, MapPin, TabletSmartphone, Server, Minus } from "lucide-react"; import { useCallback, useEffect, useState } from "react"; import { getDeviceById } from "@/entities/device/api"; import { EntityAll } from "@/entities/entity/types"; @@ -16,8 +16,17 @@ import { WebSocketMessage } from "../ws/ws"; import { ChangeStatePayload, StreamState } from "../entity/AllEntities"; import * as api from "../../entities/entity/api"; import { useMapEntityStore } from "./store"; +import { cn } from "@/lib/utils"; -export function EntityDetailsPanel() { +interface EntityDetailsPanelProps { + isCollapsed: boolean; + onToggleCollapse: () => void; +} + +export function EntityDetailsPanel({ + isCollapsed, + onToggleCollapse, +}: EntityDetailsPanelProps) { const { selectedEntity, setSelectedEntity } = useMapEntityStore(); const [device, setDevice] = useState({ name: "", @@ -112,116 +121,121 @@ export function EntityDetailsPanel() { } }, [selectedEntity]); + const handleToggleCollapse = (e: React.MouseEvent) => { + e.stopPropagation(); + onToggleCollapse(); + }; + + if (!selectedEntity) { + return null; + } + return ( -
- {selectedEntity && ( - - -
-
- - {selectedEntity.friendly_name || selectedEntity.entity_id} - - {selectedEntity.entity_id} -
- -
-
- -
-

State

-
-

- State: {selectedEntity.state?.state || "N/A"} -

-

- Last Updated:{" "} - {selectedEntity.state - ? new Date( - selectedEntity.state.last_updated, - ).toLocaleString() - : "N/A"} -

-
+ + +
+
+ + {selectedEntity.friendly_name || selectedEntity.entity_id} + + {selectedEntity.entity_id} +
+
+ + +
+
+
+ {!isCollapsed && ( + +
+

State

+
+

+ State: {selectedEntity.state?.state || "N/A"} +

+

+ Last Updated:{" "} + {selectedEntity.state + ? new Date(selectedEntity.state.last_updated).toLocaleString() + : "N/A"} +

- +
+ +
+

Details

+
    +
  • + + Type:{" "} + + {selectedEntity.entity_type || "N/A"} + +
  • +
  • + + Platform:{" "} + {selectedEntity.platform || "N/A"} +
  • +
  • + + Device ID:{" "} + + {selectedEntity.device_id ?? "N/A"} + +
  • +
+
+ + {selectedEntity.configuration && (
-

Details

-
    -
  • - - Type:{" "} - - {selectedEntity.entity_type || "N/A"} - -
  • -
  • - - Platform:{" "} - - {selectedEntity.platform || "N/A"} - -
  • -
  • - - Device ID:{" "} - - {selectedEntity.device_id ?? "N/A"} - -
  • -
+

Configuration

+
+                {JSON.stringify(selectedEntity.configuration, null, 2)}
+              
- - {selectedEntity.configuration && ( -
-

Configuration

-
-                  {JSON.stringify(selectedEntity.configuration, null, 2)}
-                
-
- )} - -
-

Device

-
    -
  • - Name:{" "} - {device.name || "N/A"} -
  • -
  • - Model:{" "} - {device.model || "N/A"} -
  • -
  • - Manufacturer:{" "} - {device.manufacturer || "N/A"} -
  • -
+ )} + +
+

Device

+
    +
  • + Name:{" "} + {device.name || "N/A"} +
  • +
  • + Model:{" "} + {device.model || "N/A"} +
  • +
  • + Manufacturer:{" "} + {device.manufacturer || "N/A"} +
  • +
+
+ + {entities.map((item) => ( +
+

+ Entity: {item.friendly_name} +

+ + +
+                {JSON.stringify(item, null, 2)}
+              
- - {entities.map((item) => ( -
-

- Entity: {item.friendly_name} -

- - -
-                  {JSON.stringify(item, null, 2)}
-                
-
- ))} - - + ))} + )} -
+
); } diff --git a/apps/client/src/features/map/index.tsx b/apps/client/src/features/map/index.tsx index a07ac4c..d6750bf 100644 --- a/apps/client/src/features/map/index.tsx +++ b/apps/client/src/features/map/index.tsx @@ -6,7 +6,7 @@ import L from "leaflet"; import icon from "leaflet/dist/images/marker-icon.png"; import iconShadow from "leaflet/dist/images/marker-shadow.png"; import "./style.css"; -import { useMapDataStore } from "@/entities/map/store"; +import { useMapDataStore, useMapInteractionStore } from "@/entities/map/store"; import { MapEntityRender } from "../map-entity/render"; import { FeatureDetailsPanel } from "../map-draw/FeatureDetailsPanel"; @@ -14,6 +14,9 @@ import { DrawingPreview } from "../map-draw/FeatureDrawingPreview"; import { FeatureEditor } from "../map-draw/FeatureEditor"; import { FeatureRenderer } from "../map-draw/FeatureRenderer"; import { MapEvents } from "../map-draw/MapEvents"; +import { EntityDetailsPanel } from "../map-entity/EntityDetailsPanel"; +import { useMapEntityStore } from "../map-entity/store"; +import { cn } from "@/lib/utils"; const DefaultIcon = L.icon({ iconUrl: icon, @@ -29,6 +32,11 @@ L.Marker.prototype.options.icon = DefaultIcon; export function MapView() { const [position, setPosition] = useState<[number, number] | null>(null); const { layer, fetchAllLayers } = useMapDataStore(); + const { selectedFeature } = useMapInteractionStore(); + const { selectedEntity } = useMapEntityStore(); + + const [isFeaturePanelCollapsed, setFeaturePanelCollapsed] = useState(false); + const [isEntityPanelCollapsed, setEntityPanelCollapsed] = useState(false); useEffect(() => { fetchAllLayers(); @@ -50,9 +58,10 @@ export function MapView() { ); }, [setPosition, fetchAllLayers]); + const showPanelContainer = selectedFeature || selectedEntity; + return ( - <> - +
{!position ? (
Loading Map... @@ -80,6 +89,32 @@ export function MapView() { )} - + +
+ {selectedFeature && ( +
+ setFeaturePanelCollapsed((prev) => !prev)} + /> +
+ )} + {selectedEntity && ( +
+ setEntityPanelCollapsed((prev) => !prev)} + /> +
+ )} +
+
); } diff --git a/apps/client/src/pages/map/index.tsx b/apps/client/src/pages/map/index.tsx index 4cc92dc..588195c 100644 --- a/apps/client/src/pages/map/index.tsx +++ b/apps/client/src/pages/map/index.tsx @@ -15,7 +15,6 @@ import { import { MapView } from "@/features/map"; import { LayerSidebar } from "@/features/map-draw/LayerSidebar"; import { MapToolbar } from "@/features/map-draw/MapToolbar"; -import { EntityDetailsPanel } from "@/features/map-entity/EntityDetailsPanel"; import { WebRTCProvider } from "@/features/rtc/WebRTCProvider"; import { AppSidebar } from "@/features/sidebar"; @@ -47,7 +46,6 @@ export function MapPage() { - From 094ce1c5bcd0109682b70867491be077ac8d018c Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Mon, 22 Sep 2025 23:41:52 +0900 Subject: [PATCH 11/16] refactor: combine rtsp with rtp streams --- apps/client/src/features/entity/Card.tsx | 11 ++- apps/server/src/broker_rtp.rs | 4 +- apps/server/src/broker_rtsp.rs | 72 ++++++++++++--- apps/server/src/handler/streams.rs | 4 +- apps/server/src/handler/ws/handlers.rs | 109 ++++++----------------- apps/server/src/initial_db.rs | 4 +- apps/server/src/lib/entity_map.rs | 25 +++--- apps/server/src/lib/stream_checker.rs | 7 +- apps/server/src/main.rs | 12 +-- apps/server/src/state.rs | 11 +-- 10 files changed, 124 insertions(+), 135 deletions(-) diff --git a/apps/client/src/features/entity/Card.tsx b/apps/client/src/features/entity/Card.tsx index ef8eeb3..a278251 100644 --- a/apps/client/src/features/entity/Card.tsx +++ b/apps/client/src/features/entity/Card.tsx @@ -89,7 +89,16 @@ export function EntityCard({
{item.platform}
-
+
+ {isEnabledStream( + item.configuration.rtsp_url as string, + streamsState, + ) ? ( + + ) : ( + + )} +
); diff --git a/apps/server/src/broker_rtp.rs b/apps/server/src/broker_rtp.rs index 3be515a..c918587 100644 --- a/apps/server/src/broker_rtp.rs +++ b/apps/server/src/broker_rtp.rs @@ -24,7 +24,7 @@ pub async fn rtp_receiver(addr: String, stream_manager: StreamManager) -> Result let ssrc = packet.header.ssrc; if let Some(stream_info) = stream_manager.get(&ssrc) { - let mut is_online_guard = stream_info.is_online.write().await; + let mut is_online_guard = stream_info.is_online.write().unwrap(); if !*is_online_guard { *is_online_guard = true; info!( @@ -34,7 +34,7 @@ pub async fn rtp_receiver(addr: String, stream_manager: StreamManager) -> Result } drop(is_online_guard); - let mut last_seen_guard = stream_info.last_seen.write().await; + let mut last_seen_guard = stream_info.last_seen.write().unwrap(); *last_seen_guard = Instant::now(); drop(last_seen_guard); diff --git a/apps/server/src/broker_rtsp.rs b/apps/server/src/broker_rtsp.rs index 2eee1bf..a84d744 100644 --- a/apps/server/src/broker_rtsp.rs +++ b/apps/server/src/broker_rtsp.rs @@ -1,13 +1,18 @@ use anyhow::{anyhow, Error, Result}; -use bytes::Bytes; use futures_util::StreamExt; use gstreamer::prelude::*; use gstreamer_app::{AppSink, AppSinkCallbacks}; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, +}; use tokio::sync::{broadcast, watch}; +use tokio::time::Instant; use tracing::{error, info, warn}; +use webrtc::rtp::packet::Packet; +use webrtc::util::Unmarshal; -use crate::state::{AppState, FrameData, Protocol}; +use crate::state::{AppState, MediaType, Protocol, StreamInfo}; pub async fn start_rtsp_pipelines( app_state: Arc, @@ -35,15 +40,23 @@ pub async fn start_rtsp_pipelines( let mut join_set = tokio::task::JoinSet::new(); for mapping in rtsp_mappings { - let frame_tx = app_state.rtsp_frame_tx.clone(); + let state = Arc::clone(&app_state); let topic = mapping.topic.clone(); + let user_id = mapping.entity_id.clone(); let mut shutdown_rx = shutdown_rx.clone(); + let (packet_tx, _) = broadcast::channel(1024); + join_set.spawn(async move { loop { let mut shutdown_rx_clone = shutdown_rx.clone(); - let pipeline_future = - run_pipeline(&topic, frame_tx.clone(), &mut shutdown_rx_clone); + let pipeline_future = run_pipeline( + &topic, + &user_id, + packet_tx.clone(), + Arc::clone(&state), + &mut shutdown_rx_clone, + ); tokio::select! { _ = shutdown_rx.changed() => { @@ -79,11 +92,13 @@ pub async fn start_rtsp_pipelines( async fn run_pipeline( rtsp_url: &str, - frame_tx: broadcast::Sender, + user_id: &str, + packet_tx: broadcast::Sender, + state: Arc, shutdown_rx: &mut watch::Receiver<()>, ) -> Result<(), Error> { let pipeline_str = format!( - "rtspsrc location={0} latency=0 ! rtph264depay ! h264parse config-interval=-1 ! video/x-h264,stream-format=byte-stream,alignment=au ! appsink name=sink emit-signals=true", + "rtspsrc location={} latency=0 ! application/x-rtp,media=video,encoding-name=H264 ! appsink name=sink emit-signals=true", rtsp_url ); @@ -97,7 +112,10 @@ async fn run_pipeline( .downcast::() .map_err(|_| anyhow!("Sink element is not an AppSink"))?; + let is_registered = Arc::new(AtomicBool::new(false)); let rtsp_url_clone = rtsp_url.to_string(); + let user_id_clone = user_id.to_string(); + appsink.set_callbacks( AppSinkCallbacks::builder() .new_sample(move |sink| { @@ -107,20 +125,46 @@ async fn run_pipeline( .map_readable() .map_err(|_| gstreamer::FlowError::Error)?; - let frame_data = FrameData { - topic: rtsp_url_clone.clone(), - buffer: Bytes::copy_from_slice(map.as_slice()), - }; + match Packet::unmarshal(&mut map.as_slice()) { + Ok(packet) => { + let ssrc: u32 = packet.header.ssrc; + + if !is_registered.load(Ordering::SeqCst) { + info!( + "RTSP stream {} registered with SSRC: {}", + rtsp_url_clone, ssrc + ); + + let stream_info = StreamInfo { + topic: rtsp_url_clone.clone(), + user_id: user_id_clone.clone(), + packet_tx: packet_tx.clone(), + media_type: MediaType::Video, + last_seen: Arc::new(RwLock::new(Instant::now())), + is_online: Arc::new(RwLock::new(true)), + }; + state.streams.insert(ssrc, stream_info); + is_registered.store(true, Ordering::SeqCst); + } - if frame_tx.send(frame_data).is_err() {} + if let Some(stream_info) = state.streams.get(&ssrc) { + *stream_info.last_seen.write().unwrap() = Instant::now(); + } + if packet_tx.send(packet).is_err() { + // Receiver is gone, maybe log this. + } + } + Err(e) => { + warn!("Failed to unmarshal RTP packet from GStreamer: {}", e); + } + } Ok(gstreamer::FlowSuccess::Ok) }) .build(), ); pipeline_bin.set_state(gstreamer::State::Playing)?; - let bus = pipeline_bin.bus().unwrap(); let mut bus_stream = bus.stream(); diff --git a/apps/server/src/handler/streams.rs b/apps/server/src/handler/streams.rs index d8a8891..336841f 100644 --- a/apps/server/src/handler/streams.rs +++ b/apps/server/src/handler/streams.rs @@ -61,8 +61,8 @@ pub async fn register_stream( user_id: auth.device_id, packet_tx, media_type: payload.media_type, - last_seen: Arc::new(RwLock::new(Instant::now())), - is_online: Arc::new(RwLock::new(false)), + last_seen: Arc::new(std::sync::RwLock::new(Instant::now())), + is_online: Arc::new(std::sync::RwLock::new(false)), }; state.streams.insert(ssrc, stream_info); diff --git a/apps/server/src/handler/ws/handlers.rs b/apps/server/src/handler/ws/handlers.rs index 27e69cc..a27b6d7 100644 --- a/apps/server/src/handler/ws/handlers.rs +++ b/apps/server/src/handler/ws/handlers.rs @@ -245,7 +245,7 @@ impl WSActor { let stream_futures = self.state.streams.iter().map(|n| async move { StreamState { topic: n.topic.clone(), - is_online: *n.is_online.read().await, + is_online: *n.is_online.read().unwrap(), } }); @@ -385,16 +385,33 @@ impl WSActor { return; } - if let Some((ssrc, info)) = self + let audio_stream_info = self .state .streams .iter() .find(|entry| { entry.value().topic == topic && entry.value().media_type == MediaType::Audio }) - .map(|entry| (*entry.key(), entry.value().clone())) - { + .map(|entry| entry.value().clone()); + + let video_stream_info = self + .state + .streams + .iter() + .find(|entry| { + entry.value().topic == topic && entry.value().media_type == MediaType::Video + }) + .map(|entry| entry.value().clone()); + + if let Some(info) = audio_stream_info { subscribed = true; + let ssrc = self + .state + .streams + .iter() + .find(|e| e.value().topic == info.topic && e.value().media_type == MediaType::Audio) + .map(|e| *e.key()) + .unwrap_or(0); info!( "[Audio] Subscribing to topic '{}' with SSRC {}", topic, ssrc @@ -472,16 +489,15 @@ impl WSActor { }); } - if let Some((ssrc, info)) = self - .state - .streams - .iter() - .find(|entry| { - entry.value().topic == topic && entry.value().media_type == MediaType::Video - }) - .map(|entry| (*entry.key(), entry.value().clone())) - { + if let Some(info) = video_stream_info { subscribed = true; + let ssrc = self + .state + .streams + .iter() + .find(|e| e.value().topic == info.topic && e.value().media_type == MediaType::Video) + .map(|e| *e.key()) + .unwrap_or(0); info!( "[Video-UDP] Subscribing to topic '{}' with SSRC {}", topic, ssrc @@ -562,73 +578,6 @@ impl WSActor { }); } - if self - .state - .topic_map - .read() - .await - .iter() - .any(|m| m.topic == topic && m.protocol == crate::state::Protocol::RTSP) - { - subscribed = true; - info!("[Video] Subscribing to topic '{}'", topic); - - let new_rtsp_video_track = Arc::new(TrackLocalStaticSample::new( - RTCRtpCodecCapability { - mime_type: MIME_TYPE_H264.to_owned(), - ..Default::default() - }, - format!("video-rtsp-{}", &topic), - format!("webrtc-stream-rtsp-{}", &topic), - )); - - let rtp_sender = self - .pc - .add_track(Arc::clone(&new_rtsp_video_track) as Arc) - .await - .unwrap(); - - self.active_tracks.insert( - topic.clone(), - new_rtsp_video_track.clone() as Arc, - ); - - tokio::spawn(async move { - let mut rtcp_buf = vec![0u8; 1500]; - while rtp_sender.read(&mut rtcp_buf).await.is_ok() {} - }); - - let mut rx = self.state.rtsp_frame_tx.subscribe(); - let topic_clone = topic.clone(); - - tokio::spawn(async move { - info!( - "[Video] Frame forwarding started for topic: {}", - topic_clone - ); - while let Ok(frame) = rx.recv().await { - if frame.topic == topic_clone { - let sample = webrtc::media::Sample { - data: frame.buffer.clone(), - duration: std::time::Duration::from_millis(33), - ..Default::default() - }; - if new_rtsp_video_track.write_sample(&sample).await.is_err() { - warn!( - "[Video] Frame write failed for topic {}, stopping.", - topic_clone - ); - break; - } - } - } - info!( - "[Video] Frame forwarding stopped for topic: {}", - topic_clone - ); - }); - } - if subscribed { info!("Track added. Starting renegotiation..."); match self.pc.create_offer(None).await { diff --git a/apps/server/src/initial_db.rs b/apps/server/src/initial_db.rs index 751be5e..564f0e0 100644 --- a/apps/server/src/initial_db.rs +++ b/apps/server/src/initial_db.rs @@ -186,8 +186,8 @@ pub fn create_hydrate_streams(pool: &DbPool, streams: &Arc>) -> Result { let entities = db::repository::get_all_entities_with_configs(&state.pool)?; - + let mut new_mappings = Vec::new(); - for entity_with_config in &entities { + for entity_with_config in &entities { if let Some(platform) = &entity_with_config.entity.platform { - let protocol = match platform.as_str() { "MQTT" => Some(Protocol::MQTT), "udp" => Some(Protocol::Udp), "lora" => Some(Protocol::Lora), "RTSP" => Some(Protocol::RTSP), - _ => None, + _ => None, }; if let (Some(proto), Some(config)) = (protocol, &entity_with_config.configuration) { - if let Some(topic) = config.get("state_topic").and_then(|v| v.as_str()) { new_mappings.push(TopicMapping { protocol: proto.clone(), @@ -40,7 +40,7 @@ pub async fn remap_topics(State(state): State>) -> Result>) -> Result STREAM_TIMEOUT { - let mut is_online_guard = stream_info.is_online.write().await; + let mut is_online_guard = stream_info.is_online.write().unwrap(); if *is_online_guard { *is_online_guard = false; info!( @@ -60,4 +60,3 @@ pub async fn stream_status_checker( Ok(()) } - diff --git a/apps/server/src/main.rs b/apps/server/src/main.rs index b0f33fd..be03c64 100644 --- a/apps/server/src/main.rs +++ b/apps/server/src/main.rs @@ -25,7 +25,7 @@ use crate::{ lib::{entity_map::remap_topics, stream_checker::stream_status_checker}, logo::print_logo, routes::web_server, - state::{AppState, FrameData, MediaType, MqttMessage, StreamInfo, StreamManager}, + state::{AppState, MediaType, MqttMessage, StreamInfo, StreamManager}, }; mod broker_mqtt; @@ -96,12 +96,11 @@ async fn main() -> Result<()> { } let (mqtt_tx, _) = broadcast::channel::(1024); - let (rtsp_frame_tx, _) = broadcast::channel::(256); - - let jwt_secret = settings.jwt_secret.clone(); - let (flow_manager_tx, flow_manager_rx) = mpsc::channel(100); let (broadcast_tx, _) = broadcast::channel(1024); + let (shutdown_tx, shutdown_rx) = watch::channel(()); + + let jwt_secret = settings.jwt_secret.clone(); let app_state = Arc::new(AppState { streams: streams.clone(), @@ -109,13 +108,10 @@ async fn main() -> Result<()> { jwt_secret: jwt_secret, pool: pool, topic_map: Arc::new(RwLock::new(Vec::new())), - rtsp_frame_tx, flow_manager_tx, broadcast_tx, }); - let (shutdown_tx, shutdown_rx) = watch::channel(()); - let configs = db::repository::get_all_system_configs(&app_state.clone().pool)?; let mut set = JoinSet::new(); diff --git a/apps/server/src/state.rs b/apps/server/src/state.rs index 9ec6efa..ed6733a 100644 --- a/apps/server/src/state.rs +++ b/apps/server/src/state.rs @@ -27,8 +27,8 @@ pub struct StreamInfo { pub user_id: String, pub packet_tx: broadcast::Sender, pub media_type: MediaType, - pub last_seen: Arc>, - pub is_online: Arc>, + pub last_seen: Arc>, + pub is_online: Arc>, } pub type StreamManager = Arc>; @@ -39,12 +39,6 @@ pub struct MqttMessage { pub bytes: Bytes, } -#[derive(Clone, Debug)] -pub struct FrameData { - pub topic: String, - pub buffer: Bytes, -} - #[derive(Serialize, Clone, Debug, PartialEq, Eq)] pub enum Protocol { MQTT, @@ -66,7 +60,6 @@ pub struct AppState { pub jwt_secret: String, pub pool: DbPool, pub topic_map: Arc>>, - pub rtsp_frame_tx: broadcast::Sender, pub flow_manager_tx: mpsc::Sender, pub broadcast_tx: broadcast::Sender, } From e75cec36af84ebeface27e38155c98153eb50aa3 Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Tue, 23 Sep 2025 16:03:07 +0900 Subject: [PATCH 12/16] feat: add ws flow input with system config --- apps/server/src/flow/engine.rs | 14 ++++++-- apps/server/src/flow/manager_state.rs | 6 +++- apps/server/src/flow/nodes/websocket_on.rs | 33 +++++++++++------- apps/server/src/flow/nodes/websocket_send.rs | 22 +++++++++--- apps/server/src/lib/mod.rs | 1 + apps/server/src/lib/system_configs.rs | 36 ++++++++++++++++++++ apps/server/src/main.rs | 5 +-- apps/server/src/state.rs | 2 ++ 8 files changed, 97 insertions(+), 22 deletions(-) create mode 100644 apps/server/src/lib/system_configs.rs diff --git a/apps/server/src/flow/engine.rs b/apps/server/src/flow/engine.rs index ed54447..a28851e 100644 --- a/apps/server/src/flow/engine.rs +++ b/apps/server/src/flow/engine.rs @@ -11,6 +11,7 @@ use tokio::task::JoinHandle; use tokio::time; use tracing::{error, info}; +use crate::db::models::SystemConfiguration; use crate::flow::nodes::branch::BranchNode; use crate::flow::nodes::custom_node::CustomNode; use crate::flow::nodes::decode_h264::DecodeH264Node; @@ -91,6 +92,7 @@ pub struct FlowEngine { mqtt_tx: Option>, stream_manager: StreamManager, binary_store: BinaryStore, + system_configs: Vec, } impl FlowEngine { @@ -99,6 +101,7 @@ impl FlowEngine { mqtt_tx: Option>, stream_manager: StreamManager, binary_store: BinaryStore, + system_configs: Vec, ) -> Result { let nodes_map: HashMap = graph.nodes.into_iter().map(|n| (n.id.clone(), n)).collect(); @@ -151,6 +154,7 @@ impl FlowEngine { mqtt_tx, stream_manager, binary_store, + system_configs, }) } @@ -228,8 +232,14 @@ impl FlowEngine { self.binary_store.clone(), )?)), - "WEBSOCKET_ON" => Ok(Box::new(WebSocketOnNode::new(&node.data)?)), - "WEBSOCKET_SEND" => Ok(Box::new(WebSocketSendNode::new(&node.data)?)), + "WEBSOCKET_ON" => Ok(Box::new(WebSocketOnNode::new( + &node.data, + self.system_configs.clone(), + )?)), + "WEBSOCKET_SEND" => Ok(Box::new(WebSocketSendNode::new( + &node.data, + self.system_configs.clone(), + )?)), s if s.starts_with('_') => Ok(Box::new(CustomNode::new(&node)?)), _ => Err(anyhow!( "Unknown or unimplemented node type: {}", diff --git a/apps/server/src/flow/manager_state.rs b/apps/server/src/flow/manager_state.rs index da256dd..36806ff 100644 --- a/apps/server/src/flow/manager_state.rs +++ b/apps/server/src/flow/manager_state.rs @@ -1,5 +1,5 @@ use crate::{ - db, + db::{self, models::SystemConfiguration}, flow::{ engine::{FlowController, FlowEngine}, types::Graph, @@ -44,6 +44,7 @@ pub struct FlowManagerActor { mqtt_tx: broadcast::Sender, stream_manager: StreamManager, binary_store: BinaryStore, + system_configs: Vec, } impl FlowManagerActor { @@ -52,6 +53,7 @@ impl FlowManagerActor { mqtt_client: Option, mqtt_tx: broadcast::Sender, stream_manager: StreamManager, + system_configs: Vec, ) -> Self { Self { receiver, @@ -60,6 +62,7 @@ impl FlowManagerActor { mqtt_tx, stream_manager, binary_store: BinaryStore::new(), + system_configs, } } @@ -81,6 +84,7 @@ impl FlowManagerActor { Some(self.mqtt_tx.clone()), self.stream_manager.clone(), self.binary_store.clone(), + self.system_configs.clone(), ) { Ok(engine) => { let engine = Arc::new(engine); diff --git a/apps/server/src/flow/nodes/websocket_on.rs b/apps/server/src/flow/nodes/websocket_on.rs index d43be16..6692fc5 100644 --- a/apps/server/src/flow/nodes/websocket_on.rs +++ b/apps/server/src/flow/nodes/websocket_on.rs @@ -17,9 +17,13 @@ use tracing::{error, info, warn}; use url::Url; use super::ExecutableNode; -use crate::flow::{ - engine::{ExecutionContext, TriggerCommand}, - types::ExecutionResult, +use crate::{ + db::models::SystemConfiguration, + flow::{ + engine::{ExecutionContext, TriggerCommand}, + types::ExecutionResult, + }, + lib::system_configs::replace_config_placeholders, }; #[derive(Deserialize, Debug, Clone)] @@ -30,12 +34,16 @@ pub struct WebSocketOnNodeData { pub struct WebSocketOnNode { data: WebSocketOnNodeData, + system_configs: Vec, } impl WebSocketOnNode { - pub fn new(node_data: &Value) -> Result { + pub fn new(node_data: &Value, system_configs: Vec) -> Result { let data: WebSocketOnNodeData = serde_json::from_value(node_data.clone())?; - Ok(Self { data }) + Ok(Self { + data, + system_configs, + }) } } @@ -110,29 +118,30 @@ impl ExecutableNode for WebSocketOnNode { node_id: String, trigger_tx: mpsc::Sender, ) -> Result> { - let url_str = self.data.url.clone(); + let url_str: String = self.data.url.clone(); + let result_url: String = replace_config_placeholders(&url_str, &self.system_configs); let handle = tokio::spawn(async move { loop { - if let Err(e) = Url::parse(&url_str) { + if let Err(e) = Url::parse(&result_url) { error!( "Invalid WebSocket URL '{}': {}. Aborting trigger.", - url_str, e + result_url, e ); break; } - info!("Connecting to WebSocket at {}", url_str); - match connect_async(&url_str).await { + info!("Connecting to WebSocket at {}", result_url); + match connect_async(&result_url).await { Ok((ws_stream, _)) => { - info!("Successfully connected to WebSocket: {}", url_str); + info!("Successfully connected to WebSocket: {}", result_url); handle_connection(ws_stream, &trigger_tx, &node_id).await; warn!("WebSocket connection lost. Reconnecting in 5 seconds..."); } Err(e) => { error!( "Failed to connect to WebSocket '{}': {}. Retrying in 5 seconds...", - url_str, e + result_url, e ); } } diff --git a/apps/server/src/flow/nodes/websocket_send.rs b/apps/server/src/flow/nodes/websocket_send.rs index c7394df..6fd721e 100644 --- a/apps/server/src/flow/nodes/websocket_send.rs +++ b/apps/server/src/flow/nodes/websocket_send.rs @@ -8,7 +8,11 @@ use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; use tracing::{error, info}; use super::ExecutableNode; -use crate::flow::{engine::ExecutionContext, types::ExecutionResult}; +use crate::{ + db::models::SystemConfiguration, + flow::{engine::ExecutionContext, types::ExecutionResult}, + lib::system_configs::replace_config_placeholders, +}; #[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] @@ -18,12 +22,16 @@ struct WebSocketSendNodeData { pub struct WebSocketSendNode { data: WebSocketSendNodeData, + system_configs: Vec, } impl WebSocketSendNode { - pub fn new(node_data: &Value) -> Result { + pub fn new(node_data: &Value, system_configs: Vec) -> Result { let data: WebSocketSendNodeData = serde_json::from_value(node_data.clone())?; - Ok(Self { data }) + Ok(Self { + data, + system_configs, + }) } } @@ -35,6 +43,7 @@ impl ExecutableNode for WebSocketSendNode { inputs: HashMap, ) -> Result { let url_str = &self.data.url; + let result_url = replace_config_placeholders(url_str, &self.system_configs); let Some(payload) = inputs.get("payload") else { if let Ok(payload_str) = serde_json::to_string(&json!({ @@ -56,8 +65,11 @@ impl ExecutableNode for WebSocketSendNode { _ => to_string(payload)?, }; - info!("Connecting to WebSocket at {} to send message", url_str); - let (ws_stream, _) = connect_async(url_str) + info!( + "Connecting to WebSocket at {} to send message", + result_url.clone() + ); + let (ws_stream, _) = connect_async(result_url) .await .map_err(|e| anyhow!("Failed to connect to WebSocket: {}", e))?; diff --git a/apps/server/src/lib/mod.rs b/apps/server/src/lib/mod.rs index 04bfa1a..459a40f 100644 --- a/apps/server/src/lib/mod.rs +++ b/apps/server/src/lib/mod.rs @@ -1,3 +1,4 @@ pub mod entity_map; pub mod hash; pub mod stream_checker; +pub mod system_configs; diff --git a/apps/server/src/lib/system_configs.rs b/apps/server/src/lib/system_configs.rs new file mode 100644 index 0000000..3695382 --- /dev/null +++ b/apps/server/src/lib/system_configs.rs @@ -0,0 +1,36 @@ +use crate::db::models::SystemConfiguration; + +pub fn replace_config_placeholders(template: &str, configs: &[SystemConfiguration]) -> String { + let mut result = String::with_capacity(template.len()); + let mut last_end = 0; + + while let Some(start) = template[last_end..].find("{:") { + let absolute_start = last_end + start; + result.push_str(&template[last_end..absolute_start]); + + let search_area = &template[absolute_start + 2..]; + if let Some(end) = search_area.find('}') { + let absolute_end = absolute_start + 2 + end; + let key = &template[absolute_start + 2..absolute_end]; + + let value = configs + .iter() + .find(|config| config.key == key) + .map(|config| config.value.as_str()); + + if let Some(v) = value { + result.push_str(v); + } else { + result.push_str(&template[absolute_start..absolute_end + 1]); + } + + last_end = absolute_end + 1; + } else { + result.push_str(&template[absolute_start..absolute_start + 2]); + last_end = absolute_start + 2; + } + } + + result.push_str(&template[last_end..]); + result +} diff --git a/apps/server/src/main.rs b/apps/server/src/main.rs index be03c64..f704f27 100644 --- a/apps/server/src/main.rs +++ b/apps/server/src/main.rs @@ -101,6 +101,7 @@ async fn main() -> Result<()> { let (shutdown_tx, shutdown_rx) = watch::channel(()); let jwt_secret = settings.jwt_secret.clone(); + let configs = db::repository::get_all_system_configs(&pool)?; let app_state = Arc::new(AppState { streams: streams.clone(), @@ -110,10 +111,9 @@ async fn main() -> Result<()> { topic_map: Arc::new(RwLock::new(Vec::new())), flow_manager_tx, broadcast_tx, + system_configs: configs.clone(), }); - let configs = db::repository::get_all_system_configs(&app_state.clone().pool)?; - let mut set = JoinSet::new(); remap_topics(axum::extract::State(app_state.clone())).await; @@ -200,6 +200,7 @@ async fn main() -> Result<()> { mqtt_client_for_flow, mqtt_tx.clone(), streams.clone(), + configs.clone(), ); tokio::spawn(async move { flow_manager.run().await; diff --git a/apps/server/src/state.rs b/apps/server/src/state.rs index ed6733a..b79f6f5 100644 --- a/apps/server/src/state.rs +++ b/apps/server/src/state.rs @@ -12,6 +12,7 @@ pub type DbPool = r2d2::Pool>; use dashmap::DashMap; +use crate::db::models::SystemConfiguration; use crate::flow::manager_state::FlowManagerCommand; #[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq, Eq)] @@ -62,4 +63,5 @@ pub struct AppState { pub topic_map: Arc>>, pub flow_manager_tx: mpsc::Sender, pub broadcast_tx: broadcast::Sender, + pub system_configs: Vec, } From ea4d7bd093ac94c41746885d94cf14e9afd84fc4 Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Tue, 23 Sep 2025 16:33:43 +0900 Subject: [PATCH 13/16] fix: reset flow state when leave --- apps/client/src/entities/flow/store.ts | 2 + apps/client/src/features/flow/Flow.tsx | 18 ++- apps/client/src/features/flow/Graph.tsx | 207 +++++++++++++----------- 3 files changed, 134 insertions(+), 93 deletions(-) diff --git a/apps/client/src/entities/flow/store.ts b/apps/client/src/entities/flow/store.ts index 5ef524e..4bd0be1 100644 --- a/apps/client/src/entities/flow/store.ts +++ b/apps/client/src/entities/flow/store.ts @@ -25,6 +25,7 @@ interface FlowState { createNewFlow: (name: string) => Promise; saveGraph: (comment?: string) => Promise; loadGraph: () => Promise; + resetFlowState: () => void; } export const useFlowStore = create((set, get) => ({ @@ -134,4 +135,5 @@ export const useFlowStore = create((set, get) => ({ console.error(error); } }, + resetFlowState: () => set({ nodes: [], edges: [], currentFlowId: null }), })); diff --git a/apps/client/src/features/flow/Flow.tsx b/apps/client/src/features/flow/Flow.tsx index 360b31a..fd9a5d4 100644 --- a/apps/client/src/features/flow/Flow.tsx +++ b/apps/client/src/features/flow/Flow.tsx @@ -39,8 +39,16 @@ import { RunFlowButton } from "./RunFlow"; import { FlowLog } from "../flow-log/FlowLog"; export default function FlowPage() { - const { nodes, edges, setNodes, setEdges, flows, fetchFlows, currentFlowId } = - useFlowStore(); + const { + nodes, + edges, + setNodes, + setEdges, + flows, + fetchFlows, + currentFlowId, + resetFlowState, + } = useFlowStore(); // const saveAndRunFlow = async () => { // await saveGraph(); @@ -49,7 +57,11 @@ export default function FlowPage() { useEffect(() => { fetchFlows(); - }, [fetchFlows]); + + return () => { + resetFlowState(); + }; + }, [fetchFlows, resetFlowState]); useEffect(() => { console.log("Fetched flows from API:", flows); diff --git a/apps/client/src/features/flow/Graph.tsx b/apps/client/src/features/flow/Graph.tsx index 43cc820..30393c3 100644 --- a/apps/client/src/features/flow/Graph.tsx +++ b/apps/client/src/features/flow/Graph.tsx @@ -1,4 +1,4 @@ -import { useState, useRef, useEffect, useCallback } from "react"; +import { useState, useRef, useEffect, useCallback, useMemo } from "react"; import * as d3 from "d3"; import { Plus, Minus, Lock, LockOpen, SquarePlus } from "lucide-react"; import { Button } from "@/components/ui/button"; @@ -83,89 +83,76 @@ export function Graph({ const [nodeRenderers, setNodeRenderers] = useState< Record - >({ - START: (g, d) => renderTitleNode(g, d), - SET_VARIABLE: (g, d) => renderVarNode(g, d, () => handleClickOption(d)), - SET_VARIABLE_WITH_EXEC: (g, d) => - renderVarNode(g, d, () => handleClickOption(d)), - - CONDITION: (g, d) => renderProcessingNode(g, d), - LOG_MESSAGE: (g, d) => renderProcessingNode(g, d), - CALCULATION: (g, d) => renderCalcNode(g, d, () => handleClickOption(d)), - HTTP_REQUEST: (g, d) => renderHttpNode(g, d, () => handleClickOption(d)), - INTERVAL: (g, d) => renderIntervalNode(g, d, () => handleClickOption(d)), - LOGIC_OPERATOR: (g, d) => renderLogicNode(g, d, () => handleClickOption(d)), - MQTT_PUBLISH: (g, d) => renderMQTTNode(g, d, () => handleClickOption(d)), - MQTT_SUBSCRIBE: (g, d) => renderMQTTNode(g, d, () => handleClickOption(d)), - TYPE_CONVERTER: (g, d) => - renderButtonNode(g, d, () => handleClickOption(d)), - RTP_STREAM_IN: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), - DECODE_OPUS: (g, d) => renderProcessingNode(g, d), - DECODE_H264: (g, d) => renderProcessingNode(g, d), - - BRANCH: (g, d) => renderProcessingNode(g, d), - JSON_SELECTOR: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), - JSON_MODIFY: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), - YOLO_DETECT: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), - GST_DECODER: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), - - WEBSOCKET_SEND: (g, d) => - renderButtonNode(g, d, () => handleClickOption(d)), - WEBSOCKET_ON: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), - }); - - const nodeGroups: NodeGroup[] = [ - { - label: "Default", - nodes: ["START", "LOG_MESSAGE"], - }, - { - label: "Data", - nodes: [ - "SET_VARIABLE", - "SET_VARIABLE_WITH_EXEC", - "TYPE_CONVERTER", - "RTP_STREAM_IN", - "DECODE_OPUS", - "GST_DECODER", - ], - }, - { - label: "Logic", - nodes: [ - "CALCULATION", - "LOGIC_OPERATOR", - "INTERVAL", - "BRANCH", - "JSON_SELECTOR", - "JSON_MODIFY", - ], - }, - { - label: "Communication", - nodes: [ - "HTTP_REQUEST", - "MQTT_PUBLISH", - "MQTT_SUBSCRIBE", - "WEBSOCKET_SEND", - "WEBSOCKET_ON", - ], - }, - { - label: "AI/ML", - nodes: ["YOLO_DETECT"], - }, - { - label: "Custom Node", - nodes: customNodes.map((item) => { - try { - return item.node_type; - } catch { - return ""; - } - }), - }, - ]; + >({}); + + const nodeGroups: NodeGroup[] = useMemo(() => { + const baseGroups: NodeGroup[] = [ + { + label: "Default", + nodes: ["START", "LOG_MESSAGE"], + }, + { + label: "Data", + nodes: [ + "SET_VARIABLE", + "SET_VARIABLE_WITH_EXEC", + "TYPE_CONVERTER", + "RTP_STREAM_IN", + "DECODE_OPUS", + "GST_DECODER", + ], + }, + { + label: "Logic", + nodes: [ + "CALCULATION", + "LOGIC_OPERATOR", + "INTERVAL", + "BRANCH", + "JSON_SELECTOR", + "JSON_MODIFY", + ], + }, + { + label: "Communication", + nodes: [ + "HTTP_REQUEST", + "MQTT_PUBLISH", + "MQTT_SUBSCRIBE", + "WEBSOCKET_SEND", + "WEBSOCKET_ON", + ], + }, + { + label: "AI/ML", + nodes: ["YOLO_DETECT"], + }, + ]; + + // if (useRos2) { + // baseGroups.push({ + // label: "ROS2", + // nodes: ["ROS2_WEBSOCKET_ON", "ROS2_WEBSOCKET_SEND"], + // }); + // } + + const customNodeTypes = customNodes.map((item) => { + try { + return item.node_type; + } catch { + return ""; + } + }); + + if (customNodeTypes.length > 0) { + baseGroups.push({ + label: "Custom Node", + nodes: customNodeTypes, + }); + } + + return baseGroups; + }, [customNodes]); const handleClickOption = (node: Node) => { setOpenedNode(node); @@ -177,18 +164,58 @@ export function Graph({ }; useEffect(() => { - const customNodeRenderers = customNodes.reduce((acc, customNode) => { - const data = JSON.parse(customNode.data); + const baseRenderers: Record = { + START: (g, d) => renderTitleNode(g, d), + SET_VARIABLE: (g, d) => renderVarNode(g, d, () => handleClickOption(d)), + SET_VARIABLE_WITH_EXEC: (g, d) => + renderVarNode(g, d, () => handleClickOption(d)), + CONDITION: (g, d) => renderProcessingNode(g, d), + LOG_MESSAGE: (g, d) => renderProcessingNode(g, d), + CALCULATION: (g, d) => renderCalcNode(g, d, () => handleClickOption(d)), + HTTP_REQUEST: (g, d) => renderHttpNode(g, d, () => handleClickOption(d)), + INTERVAL: (g, d) => renderIntervalNode(g, d, () => handleClickOption(d)), + LOGIC_OPERATOR: (g, d) => + renderLogicNode(g, d, () => handleClickOption(d)), + MQTT_PUBLISH: (g, d) => renderMQTTNode(g, d, () => handleClickOption(d)), + MQTT_SUBSCRIBE: (g, d) => + renderMQTTNode(g, d, () => handleClickOption(d)), + TYPE_CONVERTER: (g, d) => + renderButtonNode(g, d, () => handleClickOption(d)), + RTP_STREAM_IN: (g, d) => + renderButtonNode(g, d, () => handleClickOption(d)), + DECODE_OPUS: (g, d) => renderProcessingNode(g, d), + DECODE_H264: (g, d) => renderProcessingNode(g, d), + BRANCH: (g, d) => renderProcessingNode(g, d), + JSON_SELECTOR: (g, d) => + renderButtonNode(g, d, () => handleClickOption(d)), + JSON_MODIFY: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), + YOLO_DETECT: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), + GST_DECODER: (g, d) => renderButtonNode(g, d, () => handleClickOption(d)), + WEBSOCKET_SEND: (g, d) => + renderButtonNode(g, d, () => handleClickOption(d)), + WEBSOCKET_ON: (g, d) => + renderButtonNode(g, d, () => handleClickOption(d)), + }; + + // if (availableSystemNodes.ROS2_WEBSOCKET_ON) { + // baseRenderers.ROS2_WEBSOCKET_ON = (g, d) => + // renderButtonNode(g, d, () => handleClickOption(d)); + // } + // if (availableSystemNodes.ROS2_WEBSOCKET_SEND) { + // baseRenderers.ROS2_WEBSOCKET_SEND = (g, d) => + // renderButtonNode(g, d, () => handleClickOption(d)); + // } - acc[data.nodeType] = (g, d) => + const customNodeRenderers = customNodes.reduce((acc, customNode) => { + acc[customNode.node_type] = (g, d) => renderButtonNode(g, d, () => handleClickOption(d)); return acc; }, {} as Record); - setNodeRenderers((prevRenderers) => ({ - ...prevRenderers, + setNodeRenderers({ + ...baseRenderers, ...customNodeRenderers, - })); + }); }, [customNodes]); useEffect(() => { From e0c9ef3700f1d076c7281d41a15480f7e261b1f9 Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Tue, 23 Sep 2025 16:54:05 +0900 Subject: [PATCH 14/16] feat: add custom extension flow node --- apps/client/src/features/flow/Graph.tsx | 44 ++++++++++++++-------- apps/client/src/features/flow/flowNode.ts | 31 ++++++++------- apps/client/src/features/flow/flowUtils.ts | 38 ++++++++++++++++++- 3 files changed, 82 insertions(+), 31 deletions(-) diff --git a/apps/client/src/features/flow/Graph.tsx b/apps/client/src/features/flow/Graph.tsx index 30393c3..1a90995 100644 --- a/apps/client/src/features/flow/Graph.tsx +++ b/apps/client/src/features/flow/Graph.tsx @@ -37,6 +37,8 @@ import { renderButtonNode } from "./nodes/ButtonNode"; import { zoomIdentity } from "d3-zoom"; import { AddCustomNode } from "./AddCustomNode"; import { useCustomNodeStore } from "@/entities/custom-nodes/store"; +import { useConfigStore } from "@/entities/configurations/store"; +import { isValidConfig } from "../integration/validate"; type NodeGroup = { label: string; @@ -49,6 +51,8 @@ const nodeHoverColor = "#444754"; const nodeLightColor = "#d1d4e3"; const highlightColor = "#1976d2"; +const ros2NodeKey = ["EXT_ROS2_WEBSOCKET_ON", "EXT_ROS2_WEBSOCKET_SEND"]; + export function Graph({ nodes, edges, @@ -81,6 +85,12 @@ export function Graph({ const edgesRef = useRef(edges); const nodesRef = useRef(nodes); + const { configurations, fetchConfigs } = useConfigStore(); + + useEffect(() => { + fetchConfigs(); + }, [fetchConfigs]); + const [nodeRenderers, setNodeRenderers] = useState< Record >({}); @@ -129,12 +139,14 @@ export function Graph({ }, ]; - // if (useRos2) { - // baseGroups.push({ - // label: "ROS2", - // nodes: ["ROS2_WEBSOCKET_ON", "ROS2_WEBSOCKET_SEND"], - // }); - // } + const isRos2Connected = isValidConfig(configurations, "ROS"); + + if (isRos2Connected) { + baseGroups.push({ + label: "ROS2", + nodes: ros2NodeKey, + }); + } const customNodeTypes = customNodes.map((item) => { try { @@ -152,7 +164,7 @@ export function Graph({ } return baseGroups; - }, [customNodes]); + }, [customNodes, configurations]); const handleClickOption = (node: Node) => { setOpenedNode(node); @@ -197,14 +209,14 @@ export function Graph({ renderButtonNode(g, d, () => handleClickOption(d)), }; - // if (availableSystemNodes.ROS2_WEBSOCKET_ON) { - // baseRenderers.ROS2_WEBSOCKET_ON = (g, d) => - // renderButtonNode(g, d, () => handleClickOption(d)); - // } - // if (availableSystemNodes.ROS2_WEBSOCKET_SEND) { - // baseRenderers.ROS2_WEBSOCKET_SEND = (g, d) => - // renderButtonNode(g, d, () => handleClickOption(d)); - // } + const isRos2Connected = isValidConfig(configurations, "ROS"); + + if (isRos2Connected) { + ros2NodeKey.forEach((element) => { + baseRenderers[element] = (g, d) => + renderButtonNode(g, d, () => handleClickOption(d)); + }); + } const customNodeRenderers = customNodes.reduce((acc, customNode) => { acc[customNode.node_type] = (g, d) => @@ -216,7 +228,7 @@ export function Graph({ ...baseRenderers, ...customNodeRenderers, }); - }, [customNodes]); + }, [customNodes, configurations]); useEffect(() => { edgesRef.current = edges; diff --git a/apps/client/src/features/flow/flowNode.ts b/apps/client/src/features/flow/flowNode.ts index 521a0c7..0577d00 100644 --- a/apps/client/src/features/flow/flowNode.ts +++ b/apps/client/src/features/flow/flowNode.ts @@ -1,6 +1,6 @@ import { Node } from "./flowTypes"; -type DefaultValueType = { +export type DefaultValueType = { connectors: Node["connectors"]; nodeType: string; data: Node["data"]; @@ -292,21 +292,24 @@ export const DEFINITION_NODE = { url: "STRING", }, }, -} as const; - -export const CUSTOM_NODE: DefaultValueType[] = [ - { - connectors: [ - { id: "id", name: "number", type: "in" }, - { id: "id", name: "number", type: "out" }, - ], - nodeType: "_PYTHON_CUSTOM_NODE", - + EXT_ROS2_WEBSOCKET_ON: { + connectors: [{ id: "id", name: "payload", type: "out" }], + nodeType: "WEBSOCKET_ON", data: { - path: "{:code}/add_node.py", + url: "{:ros2_websocket_url}", }, dataType: { - path: "FIXED_STRING", + url: "FIXED_STRING", }, }, -]; + EXT_ROS2_WEBSOCKET_SEND: { + connectors: [{ id: "id", name: "payload", type: "in" }], + nodeType: "WEBSOCKET_SEND", + data: { + url: "{:ros2_websocket_url}", + }, + dataType: { + url: "FIXED_STRING", + }, + }, +} as const; diff --git a/apps/client/src/features/flow/flowUtils.ts b/apps/client/src/features/flow/flowUtils.ts index 6218c4b..1abd8e8 100644 --- a/apps/client/src/features/flow/flowUtils.ts +++ b/apps/client/src/features/flow/flowUtils.ts @@ -1,6 +1,6 @@ import { CustomNode } from "@/entities/custom-nodes/types"; import { DEFINITION_NODE } from "./flowNode"; -import { NodeTypes, Node } from "./flowTypes"; +import { NodeTypes, Node, DataNodeTypeType } from "./flowTypes"; export function getDefalutValue(type: NodeTypes, id: string) { const value = JSON.parse(JSON.stringify(DEFINITION_NODE[type])); @@ -84,3 +84,39 @@ export function getCustomNode( ...defaultValue, }; } + +export function getNodeValue( + nodes: DataNodeTypeType, + type: NodeTypes, + id: string, +) { + const value = JSON.parse(JSON.stringify(nodes[type])); + + for (let index = 0; index < value.connectors.length; index++) { + value.connectors[ + index + ].id = `${id}-${value.connectors[index].type}${index}`; + } + + return value; +} + +export function getNode( + nodes: DataNodeTypeType, + type: NodeTypes, + id: string, + x: number = 100, + y: number = 100, +): Node { + const defaultValue = getNodeValue(nodes, type, id); + + return { + id: id, + title: id, + x: x, + y: y, + width: 120, + height: 50, + ...defaultValue, + }; +} From 67b201db1209ac4c83211d642739e4b849c45397 Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Tue, 23 Sep 2025 17:11:39 +0900 Subject: [PATCH 15/16] fix: resize map width --- apps/client/src/features/map/index.tsx | 27 ++++++++++-- apps/client/src/pages/map/index.tsx | 59 +++++++++++++++----------- 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/apps/client/src/features/map/index.tsx b/apps/client/src/features/map/index.tsx index d6750bf..cd71700 100644 --- a/apps/client/src/features/map/index.tsx +++ b/apps/client/src/features/map/index.tsx @@ -1,5 +1,5 @@ import { useState, useEffect } from "react"; -import { MapContainer, TileLayer } from "react-leaflet"; +import { MapContainer, TileLayer, useMap } from "react-leaflet"; import "leaflet/dist/leaflet.css"; import L from "leaflet"; @@ -29,7 +29,27 @@ const failedPosition = [39.8283, -98.5795]; L.Marker.prototype.options.icon = DefaultIcon; -export function MapView() { +function MapResizer({ isSidebarCollapsed }: { isSidebarCollapsed: boolean }) { + const map = useMap(); + + useEffect(() => { + const timer = setTimeout(() => { + map.invalidateSize(); + }, 310); + + return () => { + clearTimeout(timer); + }; + }, [isSidebarCollapsed, map]); + + return null; +} + +export function MapView({ + isSidebarCollapsed, +}: { + isSidebarCollapsed: boolean; +}) { const [position, setPosition] = useState<[number, number] | null>(null); const { layer, fetchAllLayers } = useMapDataStore(); const { selectedFeature } = useMapInteractionStore(); @@ -78,15 +98,14 @@ export function MapView() { attribution='© OpenStreetMap contributors' url='https://{s}.basemaps.cartocdn.com/dark_all/{z}/{x}/{y}{r}.png' /> - {layer?.features.map((feature) => ( ))} - + )} diff --git a/apps/client/src/pages/map/index.tsx b/apps/client/src/pages/map/index.tsx index 588195c..4dc2ae5 100644 --- a/apps/client/src/pages/map/index.tsx +++ b/apps/client/src/pages/map/index.tsx @@ -11,6 +11,7 @@ import { SidebarInset, SidebarProvider, SidebarTrigger, + useSidebar, } from "@/components/ui/sidebar"; import { MapView } from "@/features/map"; import { LayerSidebar } from "@/features/map-draw/LayerSidebar"; @@ -18,36 +19,44 @@ import { MapToolbar } from "@/features/map-draw/MapToolbar"; import { WebRTCProvider } from "@/features/rtc/WebRTCProvider"; import { AppSidebar } from "@/features/sidebar"; +export function MapLayout() { + const { open } = useSidebar(); + + return ( + +
+ + + + + + / + + + + Map + + + +
+
+ + + +
+
+ ); +} + export function MapPage() { return ( - -
- - - - - - / - - - - Map - - - -
-
- - - -
-
+
); From bfcd127739247d11d70a672a69222003b0901edb Mon Sep 17 00:00:00 2001 From: DipokalLab Date: Tue, 23 Sep 2025 18:19:56 +0900 Subject: [PATCH 16/16] feat: add python custom node utils --- apps/client/src/features/flow/AddCustomNode.tsx | 5 ++++- packages/custom-node-utils/add_number.py | 10 ++++++++++ packages/custom-node-utils/random.py | 9 +++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 packages/custom-node-utils/add_number.py create mode 100644 packages/custom-node-utils/random.py diff --git a/apps/client/src/features/flow/AddCustomNode.tsx b/apps/client/src/features/flow/AddCustomNode.tsx index cbbe429..58a80ef 100644 --- a/apps/client/src/features/flow/AddCustomNode.tsx +++ b/apps/client/src/features/flow/AddCustomNode.tsx @@ -165,7 +165,10 @@ export function AddCustomNode() { {view === "list" && (
-
+
+ diff --git a/packages/custom-node-utils/add_number.py b/packages/custom-node-utils/add_number.py new file mode 100644 index 0000000..ecdf324 --- /dev/null +++ b/packages/custom-node-utils/add_number.py @@ -0,0 +1,10 @@ +def main(inputs): + a = inputs.get("a", 0) + b = inputs.get("b", 0) + + + outputs = { + "number": a+b + } + + return outputs \ No newline at end of file diff --git a/packages/custom-node-utils/random.py b/packages/custom-node-utils/random.py new file mode 100644 index 0000000..9f9fb44 --- /dev/null +++ b/packages/custom-node-utils/random.py @@ -0,0 +1,9 @@ +import random + +def main(inputs): + result = random.random() + outputs = { + "number": result + } + + return outputs