-
-
Notifications
You must be signed in to change notification settings - Fork 72
Open
Description
I am using GStreamer (Java bindings) inside a Spring Boot service to process an RTMP stream.
The pipeline contains two appsink elements (one for detection and one for recording).
When I attempt to stop and dispose the pipeline, the entire Spring Boot application crashes without throwing any Java exception. It looks like a native-level crash (JVM exits abruptly).
I would like to understand why this happens and what is the correct way to safely stop a pipeline in Java.
Java version: 17
Spring Boot version: 3.0.2
gstreamer-java / gst1-java-core version:
GStreamer version (gst-launch-1.0 --version):
OS: Windows
private void startPipeline() {
// 固定 RTMP/H264 管道(主码流降级):rtmpsrc -> flvdemux.video -> h264parse(config-interval=1,byte-stream) -> tee
// 1) 分支:avdec_h264(max-threads=1) -> videoconvert -> videoscale+videorate(704x576@25) -> BGR -> appsink_detect(AI)
// 2) 分支:直接复用源 H264 -> appsink_record(避免本地重编码,降低 CPU)
// 说明:检测分支降分辨率与帧率,录制分支不重编码;所有 queue 设置 leaky,避免 70 路时阻塞。
String desc = String.format(
"rtmpsrc location=%s ! "
+ "queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=2 ! flvdemux name=demux "
+ "demux.video ! queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=2 "
+ "! h264parse config-interval=1 ! video/x-h264,stream-format=byte-stream,alignment=au ! tee name=t "
+ "t. ! queue max-size-buffers=4 max-size-bytes=0 max-size-time=0 leaky=2 "
+ "! avdec_h264 max-threads=1 ! videoconvert ! videoscale ! videorate "
+ "! video/x-raw,format=BGR,width=704,height=576,framerate=25/1 ! appsink name=appsink_detect "
+ "t. ! queue max-size-buffers=50 max-size-bytes=0 max-size-time=0 leaky=2 "
+ "! appsink name=appsink_record drop=false emit-signals=true",
config.getStreamUrl()
);
log.info("构建 GStreamer RTMP pipeline: {}", desc);
pipeline = (Pipeline) Gst.parseLaunch(desc);
// 检测用 appsink
detectSink = (AppSink) pipeline.getElementByName("appsink_detect");
detectSink.set("emit-signals", true);
detectSink.set("max-buffers", 2);
detectSink.set("drop", true);
detectSink.connect((AppSink.NEW_SAMPLE) sink -> {
Sample sample = sink.pullSample();
if (sample == null) return FlowReturn.OK;
Buffer buffer = sample.getBuffer();
ByteBuffer bb = buffer.map(false);
if (bb != null) {
frameHandler.onFrame(config.getCameraId(), bb, sample);
buffer.unmap();
}
sample.dispose(); // 避免泄露
return FlowReturn.OK;
});
// 录像用 appsink(H264 编码)
recordSink = (AppSink) pipeline.getElementByName("appsink_record");
recordSink.set("emit-signals", true);
recordSink.set("max-buffers", 50);
recordSink.set("drop", false); // 尽量不丢,保证视频连续性
recordSink.connect((AppSink.NEW_SAMPLE) sink -> {
Sample sample = sink.pullSample();
if (sample == null) return FlowReturn.OK;
Buffer buffer = sample.getBuffer();
ByteBuffer bb = buffer.map(false);
if (bb != null) {
byte[] data = new byte[bb.remaining()];
bb.get(data);
long pts = buffer.getPresentationTimestamp(); // 时间戳(纳秒),可换算为毫秒
long ptsMs = pts == ClockTime.NONE ? -1 : pts / 1_000_000;
EncodedFrame frame = new EncodedFrame(data, ptsMs);
eventManager.onEncodedFrame(frame);
buffer.unmap();
}
sample.dispose();
return FlowReturn.OK;
});
// 监听错误和 EOS
Bus bus = pipeline.getBus();
bus.connect((Bus.ERROR) (source, err, debug) -> {
System.err.println("GStreamer ERROR: " + err + ", debug=" + debug);
stopPipeline();
});
bus.connect((Bus.EOS) (source) -> {
log.info("GStreamer EOS(流结束): {}", config.getCameraId());
stopPipeline();
});
pipeline.setState(State.PLAYING);
State playingState = pipeline.getState(ClockTime.fromSeconds(15));
if (playingState != State.PLAYING) {
System.err.println("摄像头 " + config.getCameraId() + " pipeline 未切到 PLAYING,当前=" + playingState);
}
retryDelayMs = 2000; // 重置重试间隔
log.info("摄像头 {} pipeline 已启动", config.getCameraId());
}
private void stopPipeline() {
synchronized (pipelineLock) {
if (stopping) {
return;
}
stopping = true;
running = false;
try {
// Disable callbacks first to avoid native callback accessing released objects
if (detectSink != null) {
detectSink.set("emit-signals", false);
}
if (recordSink != null) {
recordSink.set("emit-signals", false);
}
if (pipeline != null) {
Pipeline current = pipeline;
pipeline = null;
try {
current.setState(State.PAUSED);
current.getState(ClockTime.NONE);
} catch (Exception ignored) { }
current.setState(State.NULL);
current.getState(ClockTime.NONE);
// The crash seems to happen here or around this moment
current.dispose();
}
} catch (Exception ignored) {
ignored.printStackTrace();
} finally {
detectSink = null;
recordSink = null;
stopping = false;
}
}
}```
Metadata
Metadata
Assignees
Labels
No labels