diff --git a/src/om1_vlm/anonymizationSys/face_recog_stream/rtsp_video_writer.py b/src/om1_vlm/anonymizationSys/face_recog_stream/rtsp_video_writer.py index 6b0bcfb..7de2614 100644 --- a/src/om1_vlm/anonymizationSys/face_recog_stream/rtsp_video_writer.py +++ b/src/om1_vlm/anonymizationSys/face_recog_stream/rtsp_video_writer.py @@ -37,6 +37,8 @@ def __init__( estimated_fps: int = 30, local_rtsp_url: Optional[str] = "rtsp://localhost:8554/live", remote_rtsp_url: Optional[str] = None, + raw_local_rtsp_url: Optional[str] = None, + raw_remote_rtsp_url: Optional[str] = None, use_hwenc: bool = True, # NEW: prefer hardware encoding ): """ @@ -51,23 +53,32 @@ def __init__( estimated_fps : int, optional Estimated frames per second for the video stream, by default 15. local_rtsp_url : Optional[str], optional - Local RTSP URL to stream to, by default "rtsp://localhost:8554/live". + Local RTSP URL to stream processed video to, by default "rtsp://localhost:8554/live". remote_rtsp_url : Optional[str], optional - Remote RTSP URL to stream to, by default None. + Remote RTSP URL to stream processed video to, by default None. + raw_local_rtsp_url : Optional[str], optional + Local RTSP URL to stream raw video to, by default None. + raw_remote_rtsp_url : Optional[str], optional + Remote RTSP URL to stream raw video to, by default None. use_hwenc : bool If True, try to use NVENC hardware encoder (Jetson/NVIDIA GPU). Falls back to libx264 if unavailable. """ - if not local_rtsp_url and not remote_rtsp_url: - raise ValueError( - "At least one of local_rtsp_url or remote_rtsp_url must be provided." - ) + if ( + not local_rtsp_url + and not remote_rtsp_url + and not raw_local_rtsp_url + and not raw_remote_rtsp_url + ): + raise ValueError("At least one RTSP URL must be provided.") self.width = int(width) self.height = int(height) self.estimated_fps = float(estimated_fps) self.local_rtsp_url = local_rtsp_url self.remote_rtsp_url = remote_rtsp_url + self.raw_local_rtsp_url = raw_local_rtsp_url + self.raw_remote_rtsp_url = raw_remote_rtsp_url # Check hardware encoder availability self.use_nvenc = use_hwenc and _check_nvenc_available() @@ -87,18 +98,32 @@ def __init__( self._last_log_t = time.perf_counter() self._frames_written = 0 - # Subprocess for ffmpeg + # Processed video subprocess self.process: Optional[subprocess.Popen] = None self._stderr_thread: Optional[threading.Thread] = None self._stderr_stop = threading.Event() self.restart_needed = False - self._start_process() + + # Raw video subprocess + self.raw_process: Optional[subprocess.Popen] = None + self._raw_stderr_thread: Optional[threading.Thread] = None + self._raw_stderr_stop = threading.Event() + self.raw_restart_needed = False + + # Start processes + if self.local_rtsp_url or self.remote_rtsp_url: + self._start_process() + if self.raw_local_rtsp_url or self.raw_remote_rtsp_url: + self._start_raw_process() # Queue and threading - self.frame_queue: "queue.Queue" = queue.Queue(maxsize=3) # Smaller queue + self.frame_queue: "queue.Queue" = queue.Queue(maxsize=3) + self.raw_frame_queue: "queue.Queue" = queue.Queue(maxsize=3) self.stop_event = threading.Event() self.thread = threading.Thread(target=self._writer_thread, daemon=True) self.thread.start() + self.raw_thread = threading.Thread(target=self._raw_writer_thread, daemon=True) + self.raw_thread.start() def _calculate_fps(self) -> float: """ @@ -141,8 +166,14 @@ def _should_restart_process(self, new_fps: float) -> bool: fps_change = abs(new_fps - self.current_fps) / self.current_fps return fps_change > 0.30 # >30% - def _build_ffmpeg_command(self): - """Build FFmpeg command with hardware or software encoding.""" + def _build_ffmpeg_command(self, is_raw: bool = False): + """Build FFmpeg command with hardware or software encoding. + + Parameters + ---------- + is_raw : bool + If True, build command for raw video stream, otherwise for processed video. + """ gop = max(1, int(round(self.current_fps))) # 1-second keyframe interval vbv = "2M" @@ -237,14 +268,21 @@ def _build_ffmpeg_command(self): ] # RTSP output - if self.remote_rtsp_url: + if is_raw: + local_url = self.raw_local_rtsp_url + remote_url = self.raw_remote_rtsp_url + else: + local_url = self.local_rtsp_url + remote_url = self.remote_rtsp_url + + if remote_url: tee_arg = ( - f"[f=rtsp:rtsp_transport=tcp]{self.local_rtsp_url}" - f"|[f=rtsp:rtsp_transport=tcp:onfail=ignore]{self.remote_rtsp_url}" + f"[f=rtsp:rtsp_transport=tcp]{local_url}" + f"|[f=rtsp:rtsp_transport=tcp:onfail=ignore]{remote_url}" ) cmd += ["-f", "tee", tee_arg] else: - cmd += ["-f", "rtsp", "-rtsp_transport", "tcp", self.local_rtsp_url] + cmd += ["-f", "rtsp", "-rtsp_transport", "tcp", local_url] return cmd @@ -314,6 +352,72 @@ def _start_process(self): self.process = None raise + def _start_raw_process(self): + """ + Start or restart the FFmpeg subprocess for raw video stream. + """ + # Clean up any old process + if self.raw_process: + try: + if self.raw_process.stdin and not self.raw_process.stdin.closed: + self.raw_process.stdin.close() + self.raw_process.terminate() + self.raw_process.wait(timeout=2) + except subprocess.TimeoutExpired: + self.raw_process.kill() + self.raw_process.wait() + except Exception: + pass + finally: + self.raw_process = None + + # Stop previous stderr logger if any + self._raw_stderr_stop.set() + if self._raw_stderr_thread and self._raw_stderr_thread.is_alive(): + try: + self._raw_stderr_thread.join(timeout=1.0) + except Exception: + pass + self._raw_stderr_stop.clear() + self._raw_stderr_thread = None + + # Launch fresh + cmd = self._build_ffmpeg_command(is_raw=True) + enc_type = "NVENC" if self.use_nvenc else "libx264" + logging.info( + "Starting FFmpeg (RAW, %s) with FPS: %.2f", enc_type, self.current_fps + ) + logging.info("FFmpeg RAW command: %s", " ".join(cmd)) + + try: + # Use a generous stdin buffer to reduce write stalls + self.raw_process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, # capture for logging + bufsize=self.width * self.height * 3 * 2, + ) + logging.info( + "Started FFmpeg RAW subprocess (PID: %s, encoder: %s)", + self.raw_process.pid, + enc_type, + ) + self.raw_restart_needed = False + + # Start stderr logger thread + self._raw_stderr_thread = threading.Thread( + target=self._raw_stderr_logger_loop, + name="ffmpeg-raw-stderr", + daemon=True, + ) + self._raw_stderr_thread.start() + + except Exception as e: + logging.error("Failed to start FFmpeg RAW: %s", e) + self.raw_process = None + raise + def _stderr_logger_loop(self): """ Continuously read FFmpeg stderr and log lines (helps debug exits / errors). @@ -330,15 +434,68 @@ def _stderr_logger_loop(self): except Exception as e: logging.debug("ffmpeg stderr logger ended: %s", e) + def _raw_stderr_logger_loop(self): + """ + Continuously read FFmpeg RAW stderr and log lines (helps debug exits / errors). + """ + if not self.raw_process or not self.raw_process.stderr: + return + try: + for raw in iter(self.raw_process.stderr.readline, b""): + if self._raw_stderr_stop.is_set(): + break + line = raw.decode(errors="replace").rstrip() + if line: + logging.error("ffmpeg-raw> %s", line) + except Exception as e: + logging.debug("ffmpeg raw stderr logger ended: %s", e) + def _is_process_healthy(self) -> bool: """ Check if the process is running and available. """ return self.process is not None and self.process.poll() is None + def write_raw_frame(self, frame): + """ + Queue a raw frame for sending to raw RTSP stream. + """ + if self.stop_event.is_set() or frame is None: + return + if not self.raw_local_rtsp_url and not self.raw_remote_rtsp_url: + return + + # Quick sanity check + try: + h, w = frame.shape[:2] + if w != self.width or h != self.height: + logging.warning( + "Raw frame size %dx%d != configured %dx%d; dropping", + w, + h, + self.width, + self.height, + ) + return + except Exception: + return + + try: + # Non-blocking put with drop-oldest strategy + if self.raw_frame_queue.full(): + try: + self.raw_frame_queue.get_nowait() # Drop oldest + except queue.Empty: + pass + self.raw_frame_queue.put_nowait(frame) + except queue.Full: + pass # Drop frame if still full + except Exception as e: + logging.error("Error queueing raw frame: %s", e) + def write_frame(self, frame): """ - Queue a frame for sending; tracks FPS and triggers restarts if needed. + Queue a processed frame for sending; tracks FPS and triggers restarts if needed. """ if self.stop_event.is_set() or frame is None: return @@ -453,6 +610,78 @@ def _writer_thread(self): except Exception: pass + def _raw_writer_thread(self): + """ + Thread function to write raw frames to FFmpeg subprocess. + """ + frames_written = 0 + last_log_t = time.perf_counter() + + while not self.stop_event.is_set(): + try: + frame = self.raw_frame_queue.get(timeout=1) + except queue.Empty: + continue + + # Restart if requested or if FFmpeg died + if self.raw_restart_needed or not ( + self.raw_process and self.raw_process.poll() is None + ): + logging.warning("Restarting FFmpeg RAW process...") + self._start_raw_process() + if not (self.raw_process and self.raw_process.poll() is None): + logging.error("Failed to restart FFmpeg RAW process") + continue + + # Write raw BGR24 + try: + if self.raw_process and self.raw_process.stdin: + self.raw_process.stdin.write(frame.tobytes()) + self.raw_process.stdin.flush() + frames_written += 1 + except (BrokenPipeError, OSError) as e: + logging.error("Pipe error writing raw frame: %s", e) + self.raw_restart_needed = True + except Exception as e: + logging.error("Unexpected error writing raw frame: %s", e) + + # Periodic throughput log + now = time.perf_counter() + if now - last_log_t >= self._log_interval: + elapsed = now - last_log_t + fps = frames_written / elapsed if elapsed > 0 else 0.0 + qsz = self.raw_frame_queue.qsize() + enc = "nvenc" if self.use_nvenc else "x264" + logging.info( + "[RTSP RAW out] ~%.1f fps (queue=%d, enc=%s)", fps, qsz, enc + ) + last_log_t = now + frames_written = 0 + + # Teardown + if self.raw_process: + try: + if self.raw_process.stdin and not self.raw_process.stdin.closed: + self.raw_process.stdin.close() + self.raw_process.terminate() + try: + self.raw_process.wait(timeout=3) + except subprocess.TimeoutExpired: + self.raw_process.kill() + self.raw_process.wait() + except Exception: + pass + finally: + self.raw_process = None + + # Stop stderr logger + self._raw_stderr_stop.set() + if self._raw_stderr_thread and self._raw_stderr_thread.is_alive(): + try: + self._raw_stderr_thread.join(timeout=1.0) + except Exception: + pass + def get_current_fps(self) -> float: """ Get the current measured FPS. @@ -469,6 +698,11 @@ def stop(self): except Exception: pass + try: + self.raw_thread.join(timeout=5) + except Exception: + pass + if self.process: try: if self.process.stdin and not self.process.stdin.closed: @@ -491,3 +725,26 @@ def stop(self): self._stderr_thread.join(timeout=1.0) except Exception: pass + + if self.raw_process: + try: + if self.raw_process.stdin and not self.raw_process.stdin.closed: + self.raw_process.stdin.close() + self.raw_process.terminate() + try: + self.raw_process.wait(timeout=3) + except subprocess.TimeoutExpired: + self.raw_process.kill() + self.raw_process.wait() + except Exception: + pass + finally: + self.raw_process = None + + # Stop raw stderr logger + self._raw_stderr_stop.set() + if self._raw_stderr_thread and self._raw_stderr_thread.is_alive(): + try: + self._raw_stderr_thread.join(timeout=1.0) + except Exception: + pass diff --git a/src/om1_vlm/anonymizationSys/face_recog_stream/run.py b/src/om1_vlm/anonymizationSys/face_recog_stream/run.py index 3691a72..964c819 100644 --- a/src/om1_vlm/anonymizationSys/face_recog_stream/run.py +++ b/src/om1_vlm/anonymizationSys/face_recog_stream/run.py @@ -214,8 +214,8 @@ def main() -> None: script_dir = os.path.dirname(os.path.abspath(__file__)) models_dir = os.path.join(script_dir, "..", "models") - scrfd_name = "om1-modules_scrfd_2.5g_bnkps_shape640x640.engine" - arc_name = "om1-modules_buffalo_m_w600k_r50.engine" + scrfd_name = "thor_scrfd_2.5g_bnkps_shape640x640.engine" + arc_name = "thor_buffalo_m_w600k_r50.engine" pose_name = "yolo11s-pose.engine" default_scrfd_engine = os.path.join(models_dir, scrfd_name) @@ -388,6 +388,15 @@ def main() -> None: "--remote-rtsp", help="Remote RTSP URL to relay (e.g. rtsp://host:8554/top_camera).", ) + ap.add_argument( + "--raw-local-rtsp", + default="rtsp://localhost:8554/top_camera_raw", + help="RTSP URL to publish raw video (e.g. rtsp://host:8554/top_camera_raw).", + ) + ap.add_argument( + "--raw-remote-rtsp", + help="Remote RTSP URL to relay raw video.", + ) # UI / perf ap.add_argument( @@ -507,13 +516,23 @@ def main() -> None: int(cap.fps), args.local_rtsp, args.remote_rtsp, - ) - logger.info( - "Publishing RTSP: local=%s%s", - args.local_rtsp, - f" remote={args.remote_rtsp}" if args.remote_rtsp else "", + args.raw_local_rtsp, + args.raw_remote_rtsp, ) + if args.local_rtsp or args.remote_rtsp: + logger.info( + "Publishing processed RTSP: local=%s%s", + args.local_rtsp or "(none)", + f" remote={args.remote_rtsp}" if args.remote_rtsp else "", + ) + if args.raw_local_rtsp or args.raw_remote_rtsp: + logger.info( + "Publishing raw RTSP: local=%s%s", + args.raw_local_rtsp or "(none)", + f" remote={args.raw_remote_rtsp}" if args.raw_remote_rtsp else "", + ) + # Who tracking who = WhoTracker(lookback_sec=args.http_lookback_sec) @@ -669,6 +688,9 @@ def handle_sigint(_sig, _frame): time.sleep(0.02) continue + # Publish raw video + rstp_writer.write_raw_frame(frame) + # Snapshot config with cfg_lock: do_blur = bool(cfg["blur"])