Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 273 additions & 16 deletions src/om1_vlm/anonymizationSys/face_recog_stream/rtsp_video_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def __init__(
estimated_fps: int = 30,
local_rtsp_url: Optional[str] = "rtsp://localhost:8554/live",
remote_rtsp_url: Optional[str] = None,
raw_local_rtsp_url: Optional[str] = None,
raw_remote_rtsp_url: Optional[str] = None,
use_hwenc: bool = True, # NEW: prefer hardware encoding
):
"""
Expand All @@ -51,23 +53,32 @@ def __init__(
estimated_fps : int, optional
Estimated frames per second for the video stream, by default 15.
local_rtsp_url : Optional[str], optional
Local RTSP URL to stream to, by default "rtsp://localhost:8554/live".
Local RTSP URL to stream processed video to, by default "rtsp://localhost:8554/live".
remote_rtsp_url : Optional[str], optional
Remote RTSP URL to stream to, by default None.
Remote RTSP URL to stream processed video to, by default None.
raw_local_rtsp_url : Optional[str], optional
Local RTSP URL to stream raw video to, by default None.
raw_remote_rtsp_url : Optional[str], optional
Remote RTSP URL to stream raw video to, by default None.
use_hwenc : bool
If True, try to use NVENC hardware encoder (Jetson/NVIDIA GPU).
Falls back to libx264 if unavailable.
"""
if not local_rtsp_url and not remote_rtsp_url:
raise ValueError(
"At least one of local_rtsp_url or remote_rtsp_url must be provided."
)
if (
not local_rtsp_url
and not remote_rtsp_url
and not raw_local_rtsp_url
and not raw_remote_rtsp_url
):
raise ValueError("At least one RTSP URL must be provided.")

self.width = int(width)
self.height = int(height)
self.estimated_fps = float(estimated_fps)
self.local_rtsp_url = local_rtsp_url
self.remote_rtsp_url = remote_rtsp_url
self.raw_local_rtsp_url = raw_local_rtsp_url
self.raw_remote_rtsp_url = raw_remote_rtsp_url

# Check hardware encoder availability
self.use_nvenc = use_hwenc and _check_nvenc_available()
Expand All @@ -87,18 +98,32 @@ def __init__(
self._last_log_t = time.perf_counter()
self._frames_written = 0

# Subprocess for ffmpeg
# Processed video subprocess
self.process: Optional[subprocess.Popen] = None
self._stderr_thread: Optional[threading.Thread] = None
self._stderr_stop = threading.Event()
self.restart_needed = False
self._start_process()

# Raw video subprocess
self.raw_process: Optional[subprocess.Popen] = None
self._raw_stderr_thread: Optional[threading.Thread] = None
self._raw_stderr_stop = threading.Event()
self.raw_restart_needed = False

# Start processes
if self.local_rtsp_url or self.remote_rtsp_url:
self._start_process()
if self.raw_local_rtsp_url or self.raw_remote_rtsp_url:
self._start_raw_process()

# Queue and threading
self.frame_queue: "queue.Queue" = queue.Queue(maxsize=3) # Smaller queue
self.frame_queue: "queue.Queue" = queue.Queue(maxsize=3)
self.raw_frame_queue: "queue.Queue" = queue.Queue(maxsize=3)
self.stop_event = threading.Event()
self.thread = threading.Thread(target=self._writer_thread, daemon=True)
self.thread.start()
self.raw_thread = threading.Thread(target=self._raw_writer_thread, daemon=True)
self.raw_thread.start()

def _calculate_fps(self) -> float:
"""
Expand Down Expand Up @@ -141,8 +166,14 @@ def _should_restart_process(self, new_fps: float) -> bool:
fps_change = abs(new_fps - self.current_fps) / self.current_fps
return fps_change > 0.30 # >30%

def _build_ffmpeg_command(self):
"""Build FFmpeg command with hardware or software encoding."""
def _build_ffmpeg_command(self, is_raw: bool = False):
"""Build FFmpeg command with hardware or software encoding.

Parameters
----------
is_raw : bool
If True, build command for raw video stream, otherwise for processed video.
"""
gop = max(1, int(round(self.current_fps))) # 1-second keyframe interval
vbv = "2M"

Expand Down Expand Up @@ -237,14 +268,21 @@ def _build_ffmpeg_command(self):
]

# RTSP output
if self.remote_rtsp_url:
if is_raw:
local_url = self.raw_local_rtsp_url
remote_url = self.raw_remote_rtsp_url
else:
local_url = self.local_rtsp_url
remote_url = self.remote_rtsp_url

if remote_url:
tee_arg = (
f"[f=rtsp:rtsp_transport=tcp]{self.local_rtsp_url}"
f"|[f=rtsp:rtsp_transport=tcp:onfail=ignore]{self.remote_rtsp_url}"
f"[f=rtsp:rtsp_transport=tcp]{local_url}"
f"|[f=rtsp:rtsp_transport=tcp:onfail=ignore]{remote_url}"
)
cmd += ["-f", "tee", tee_arg]
else:
cmd += ["-f", "rtsp", "-rtsp_transport", "tcp", self.local_rtsp_url]
cmd += ["-f", "rtsp", "-rtsp_transport", "tcp", local_url]

return cmd

Expand Down Expand Up @@ -314,6 +352,72 @@ def _start_process(self):
self.process = None
raise

def _start_raw_process(self):
"""
Start or restart the FFmpeg subprocess for raw video stream.
"""
# Clean up any old process
if self.raw_process:
try:
if self.raw_process.stdin and not self.raw_process.stdin.closed:
self.raw_process.stdin.close()
self.raw_process.terminate()
self.raw_process.wait(timeout=2)
except subprocess.TimeoutExpired:
self.raw_process.kill()
self.raw_process.wait()
except Exception:
pass
finally:
self.raw_process = None

# Stop previous stderr logger if any
self._raw_stderr_stop.set()
if self._raw_stderr_thread and self._raw_stderr_thread.is_alive():
try:
self._raw_stderr_thread.join(timeout=1.0)
except Exception:
pass
self._raw_stderr_stop.clear()
self._raw_stderr_thread = None

# Launch fresh
cmd = self._build_ffmpeg_command(is_raw=True)
enc_type = "NVENC" if self.use_nvenc else "libx264"
logging.info(
"Starting FFmpeg (RAW, %s) with FPS: %.2f", enc_type, self.current_fps
)
logging.info("FFmpeg RAW command: %s", " ".join(cmd))

try:
# Use a generous stdin buffer to reduce write stalls
self.raw_process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE, # capture for logging
bufsize=self.width * self.height * 3 * 2,
)
logging.info(
"Started FFmpeg RAW subprocess (PID: %s, encoder: %s)",
self.raw_process.pid,
enc_type,
)
self.raw_restart_needed = False

# Start stderr logger thread
self._raw_stderr_thread = threading.Thread(
target=self._raw_stderr_logger_loop,
name="ffmpeg-raw-stderr",
daemon=True,
)
self._raw_stderr_thread.start()

except Exception as e:
logging.error("Failed to start FFmpeg RAW: %s", e)
self.raw_process = None
raise

def _stderr_logger_loop(self):
"""
Continuously read FFmpeg stderr and log lines (helps debug exits / errors).
Expand All @@ -330,15 +434,68 @@ def _stderr_logger_loop(self):
except Exception as e:
logging.debug("ffmpeg stderr logger ended: %s", e)

def _raw_stderr_logger_loop(self):
"""
Continuously read FFmpeg RAW stderr and log lines (helps debug exits / errors).
"""
if not self.raw_process or not self.raw_process.stderr:
return
try:
for raw in iter(self.raw_process.stderr.readline, b""):
if self._raw_stderr_stop.is_set():
break
line = raw.decode(errors="replace").rstrip()
if line:
logging.error("ffmpeg-raw> %s", line)
except Exception as e:
logging.debug("ffmpeg raw stderr logger ended: %s", e)

def _is_process_healthy(self) -> bool:
"""
Check if the process is running and available.
"""
return self.process is not None and self.process.poll() is None

def write_raw_frame(self, frame):
"""
Queue a raw frame for sending to raw RTSP stream.
"""
if self.stop_event.is_set() or frame is None:
return
if not self.raw_local_rtsp_url and not self.raw_remote_rtsp_url:
return

# Quick sanity check
try:
h, w = frame.shape[:2]
if w != self.width or h != self.height:
logging.warning(
"Raw frame size %dx%d != configured %dx%d; dropping",
w,
h,
self.width,
self.height,
)
return
except Exception:
return

try:
# Non-blocking put with drop-oldest strategy
if self.raw_frame_queue.full():
try:
self.raw_frame_queue.get_nowait() # Drop oldest
except queue.Empty:
pass
self.raw_frame_queue.put_nowait(frame)
except queue.Full:
pass # Drop frame if still full
except Exception as e:
logging.error("Error queueing raw frame: %s", e)

def write_frame(self, frame):
"""
Queue a frame for sending; tracks FPS and triggers restarts if needed.
Queue a processed frame for sending; tracks FPS and triggers restarts if needed.
"""
if self.stop_event.is_set() or frame is None:
return
Expand Down Expand Up @@ -453,6 +610,78 @@ def _writer_thread(self):
except Exception:
pass

def _raw_writer_thread(self):
"""
Thread function to write raw frames to FFmpeg subprocess.
"""
frames_written = 0
last_log_t = time.perf_counter()

while not self.stop_event.is_set():
try:
frame = self.raw_frame_queue.get(timeout=1)
except queue.Empty:
continue

# Restart if requested or if FFmpeg died
if self.raw_restart_needed or not (
self.raw_process and self.raw_process.poll() is None
):
logging.warning("Restarting FFmpeg RAW process...")
self._start_raw_process()
if not (self.raw_process and self.raw_process.poll() is None):
logging.error("Failed to restart FFmpeg RAW process")
continue

# Write raw BGR24
try:
if self.raw_process and self.raw_process.stdin:
self.raw_process.stdin.write(frame.tobytes())
self.raw_process.stdin.flush()
frames_written += 1
except (BrokenPipeError, OSError) as e:
logging.error("Pipe error writing raw frame: %s", e)
self.raw_restart_needed = True
except Exception as e:
logging.error("Unexpected error writing raw frame: %s", e)

# Periodic throughput log
now = time.perf_counter()
if now - last_log_t >= self._log_interval:
elapsed = now - last_log_t
fps = frames_written / elapsed if elapsed > 0 else 0.0
qsz = self.raw_frame_queue.qsize()
enc = "nvenc" if self.use_nvenc else "x264"
logging.info(
"[RTSP RAW out] ~%.1f fps (queue=%d, enc=%s)", fps, qsz, enc
)
last_log_t = now
frames_written = 0

# Teardown
if self.raw_process:
try:
if self.raw_process.stdin and not self.raw_process.stdin.closed:
self.raw_process.stdin.close()
self.raw_process.terminate()
try:
self.raw_process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.raw_process.kill()
self.raw_process.wait()
except Exception:
pass
finally:
self.raw_process = None

# Stop stderr logger
self._raw_stderr_stop.set()
if self._raw_stderr_thread and self._raw_stderr_thread.is_alive():
try:
self._raw_stderr_thread.join(timeout=1.0)
except Exception:
pass

def get_current_fps(self) -> float:
"""
Get the current measured FPS.
Expand All @@ -469,6 +698,11 @@ def stop(self):
except Exception:
pass

try:
self.raw_thread.join(timeout=5)
except Exception:
pass

if self.process:
try:
if self.process.stdin and not self.process.stdin.closed:
Expand All @@ -491,3 +725,26 @@ def stop(self):
self._stderr_thread.join(timeout=1.0)
except Exception:
pass

if self.raw_process:
try:
if self.raw_process.stdin and not self.raw_process.stdin.closed:
self.raw_process.stdin.close()
self.raw_process.terminate()
try:
self.raw_process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.raw_process.kill()
self.raw_process.wait()
except Exception:
pass
finally:
self.raw_process = None

# Stop raw stderr logger
self._raw_stderr_stop.set()
if self._raw_stderr_thread and self._raw_stderr_thread.is_alive():
try:
self._raw_stderr_thread.join(timeout=1.0)
except Exception:
pass
Loading
Loading